Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-1149

Side input access fails in direct runner (possibly others too) when input element in multiple windows

Details

    • Bug
    • Status: Resolved
    • P0
    • Resolution: Fixed
    • None
    • 0.4.0
    • runner-core
    • None

    Description

        private static class FnWithSideInputs extends DoFn<String, String> {
          private final PCollectionView<Integer> view;
      
          private FnWithSideInputs(PCollectionView<Integer> view) {
            this.view = view;
          }
      
          @ProcessElement
          public void processElement(ProcessContext c) {
            c.output(c.element() + ":" + c.sideInput(view));
          }
        }
      
        @Test
        public void testSideInputsWithMultipleWindows() {
          Pipeline p = TestPipeline.create();
      
          MutableDateTime mutableNow = Instant.now().toMutableDateTime();
          mutableNow.setMillisOfSecond(0);
          Instant now = mutableNow.toInstant();
      
          SlidingWindows windowFn =
              SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(1));
          PCollectionView<Integer> view = p.apply(Create.of(1)).apply(View.<Integer>asSingleton());
          PCollection<String> res =
              p.apply(Create.timestamped(TimestampedValue.of("a", now)))
                  .apply(Window.<String>into(windowFn))
                  .apply(ParDo.of(new FnWithSideInputs(view)).withSideInputs(view));
      
          PAssert.that(res).containsInAnyOrder("a:1");
      
          p.run();
        }
      

      This fails with the following exception:

      org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalStateException: sideInput called when main input element is in multiple windows
      
      	at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:343)
      	at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:1)
      	at org.apache.beam.sdk.Pipeline.run(Pipeline.java:176)
      	at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:112)
      	at ....
      Caused by: java.lang.IllegalStateException: sideInput called when main input element is in multiple windows
      	at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.sideInput(SimpleDoFnRunner.java:514)
      	at org.apache.beam.sdk.transforms.ParDoTest$FnWithSideInputs.processElement(ParDoTest.java:738)
      

      Attachments

        Issue Links

          Activity

            People

              kenn Kenneth Knowles
              jkff Eugene Kirpichov
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: