Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-7128

When unset parallelism is unavailable when applying ReplacementTransforms

Details

    • Bug
    • Status: Resolved
    • P3
    • Resolution: Fixed
    • None
    • 2.13.0
    • runner-flink
    • None

    Description

      In streaming mode, the Flink Runner applies a ReplacementTransform to set a sharding strategy for Writes. This requires the parallelism of the pipeline to be available up front. The replacement transforms are applied before the final parallelism has been determined which leads to an error if the parallelism has not been manually set:
       

      Exception in thread "main" java.lang.IllegalArgumentException: Parallelism of a job should be greater than 0. Currently set: {} [-1]
          at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:161)
          at org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator$StreamingShardedWriteFactory.getReplacementTransform(FlinkStreamingPipelineTranslator.java:197)
          at org.apache.beam.sdk.Pipeline.applyReplacement(Pipeline.java:554)
          at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:290)
          at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:208)
          at org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:93)
          at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:108)
          at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
          at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
          at com.maximilianmichels.WordCount.main(WordCount.java:64)
      

      Attachments

        Issue Links

          Activity

            People

              mxm Maximilian Michels
              mxm Maximilian Michels
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 50m
                  50m