Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-2753

File DynamicDestinations side inputs don't work with sharding

Details

    • Bug
    • Status: Resolved
    • P2
    • Resolution: Fixed
    • None
    • 2.2.0
    • sdk-java-core

    Description

      WriteWithShardingFactory uses PTransformReplacements.getSingletonMaininput https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java#L74

      However if the dynamic destinations have a side input, then the transform has more than 1 input and the function fails:

      Exception in thread "main" java.lang.IllegalArgumentException: Got multiple inputs that are not additional inputs for a singleton main input: Avro schema side input/ParMultiDo(Anonymous).out0 [PCollection] and Run read all/Execute queries/ParMultiDo(NaiveSpannerRead).out0 [PCollection]
      at org.apache.beam.runners.direct.repackaged.runners.core.construction.java.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:383)
      at org.apache.beam.runners.direct.repackaged.runners.core.construction.PTransformReplacements.getSingletonMainInput(PTransformReplacements.java:50)
      at org.apache.beam.runners.direct.repackaged.runners.core.construction.PTransformReplacements.getSingletonMainInput(PTransformReplacements.java:41)
      at org.apache.beam.runners.direct.WriteWithShardingFactory.getReplacementTransform(WriteWithShardingFactory.java:74)
      at org.apache.beam.sdk.Pipeline.applyReplacement(Pipeline.java:540)
      at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:280)
      at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:201)
      at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:169)
      at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:62)
      at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
      at org.apache.beam.sdk.Pipeline.run(Pipeline.java:289)

      This is not caught by unit tests because unit tests specify withoutSharding().
      https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java#L644

      CC: mkhadikov

      Attachments

        Issue Links

          Activity

            People

              jkff Eugene Kirpichov
              jkff Eugene Kirpichov
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: