Details

    • Sub-task
    • Status: Resolved
    • P2
    • Resolution: Cannot Reproduce
    • 2.5.0
    • Not applicable
    • sdk-java-core

    Description

      When using JDK 10, using a ParDo after a CoGroupByKey seems to create the following exception when executed on local runner:

      Exception in thread "main" org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalStateException: Invisible parameter type of Main$1 arg0 for public Main$1$DoFnInvoker(Main$1)
          at org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2214)
          at org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.cache.LocalCache.get(LocalCache.java:4053)
          at org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4057)
          ...
      Caused by: java.lang.IllegalStateException: Invisible parameter type of Main$1 arg0 for public Main$1$DoFnInvoker(Main$1)
          at org.apache.beam.repackaged.beam_sdks_java_core.net.bytebuddy.dynamic.scaffold.InstrumentedType$Default.validated(InstrumentedType.java:925)
          at org.apache.beam.repackaged.beam_sdks_java_core.net.bytebuddy.dynamic.scaffold.MethodRegistry$Default.prepare(MethodRegistry.java:465)
          at org.apache.beam.repackaged.beam_sdks_java_core.net.bytebuddy.dynamic.scaffold.subclass.SubclassDynamicTypeBuilder.make(SubclassDynamicTypeBuilder.java:170)
          ...
      

      This error disappears completely when using JDK 8. Here is a minimal example to reproduce it:

      import org.apache.beam.sdk.Pipeline;
      import org.apache.beam.sdk.options.PipelineOptions;
      import org.apache.beam.sdk.options.PipelineOptionsFactory;
      import org.apache.beam.sdk.transforms.Create;
      import org.apache.beam.sdk.transforms.DoFn;
      import org.apache.beam.sdk.transforms.ParDo;
      import org.apache.beam.sdk.transforms.join.CoGbkResult;
      import org.apache.beam.sdk.transforms.join.CoGroupByKey;
      import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
      import org.apache.beam.sdk.values.KV;
      import org.apache.beam.sdk.values.PCollection;
      import org.apache.beam.sdk.values.TupleTag;
      
      import java.util.Arrays;
      import java.util.List;
      
      public class Main {
          public static void main(String[] args) {
              PipelineOptions options = PipelineOptionsFactory.create();
              Pipeline p = Pipeline.create(options);
      
              final TupleTag<String> emailsTag = new TupleTag<>();
              final TupleTag<String> phonesTag = new TupleTag<>();
      
              final List<KV<String, String>> emailsList =
                      Arrays.asList(
                              KV.of("amy", "amy@example.com"),
                              KV.of("carl", "carl@example.com"),
                              KV.of("julia", "julia@example.com"),
                              KV.of("carl", "carl@email.com"));
      
              final List<KV<String, String>> phonesList =
                      Arrays.asList(
                              KV.of("amy", "111-222-3333"),
                              KV.of("james", "222-333-4444"),
                              KV.of("amy", "333-444-5555"),
                              KV.of("carl", "444-555-6666"));
      
      
              PCollection<KV<String, String>> emails = p.apply("CreateEmails", Create.of(emailsList));
              PCollection<KV<String, String>> phones = p.apply("CreatePhones", Create.of(phonesList));
      
              PCollection<KV<String, CoGbkResult>> results =
                      KeyedPCollectionTuple.of(emailsTag, emails)
                              .and(phonesTag, phones)
                              .apply(CoGroupByKey.<String>create());
      
              PCollection<String> contactLines =
                      results.apply(
                              ParDo.of(
                                      new DoFn<KV<String, CoGbkResult>, String>() {
                                          @ProcessElement
                                          public void processElement(ProcessContext c) {
                                              KV<String, CoGbkResult> e = c.element();
                                              String name = e.getKey();
                                              Iterable<String> emailsIter = e.getValue().getAll(emailsTag);
                                              Iterable<String> phonesIter = e.getValue().getAll(phonesTag);
                                              String formattedResult = "";
                                              c.output(formattedResult);
                                          }
                                      }));
      
              p.run().waitUntilFinish();
          }
      }

      Attachments

        Issue Links

          Activity

            People

              ŁukaszG Lukasz Gajowy
              mpedersen Mike Pedersen
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: