Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-5409

Beam Java SDK 2.4/2.5 PAssert with CoGroupByKey

Details

    • Bug
    • Status: Resolved
    • P2
    • Resolution: Fixed
    • 2.4.0, 2.5.0
    • 2.4.0, 2.5.0
    • testing
    • None

    Description

      I may be missing something obvious, but for some reason I can't make PAssert & TestPipeline work with CoGroupByKey -- but without it, it works fine.
      Here is a reference test file that can reproduce the issue I'm facing. I tested with both beam sdk 2.4 and 2.5.

      (For the record this was posted on StackOverflow before.)

      For comparison, testWorking works as intended, and testBroken has an additional step like this:

      // code placeholder
      // The following four lines causes an issue.
      PCollectionTuple tuple =
          KeyedPCollectionTuple.of(inTag1, pc1.apply("String to KV<String, String>", ParDo.of(new String2KV())))
              .and(inTag2, pc2).apply("CoGroupByKey", CoGroupByKey.<String>create()).apply("Some Merge DoFn",
                  ParDo.of(new MergeDoFn(inTag1, inTag2, outTag2)).withOutputTags(outTag1, TupleTagList.of(outTag2)));
      

      The error I get can be found after the code below.

      Has anyone had a similar issue with test pipeline before? I haven't tested it yet extensively, but I couldn't find relevant information on CoGroupByKey & TestPipeline together. In production, the same code works fine for my team, and we wanted to add a few unit tests using TestPipeline and PAssert. That's how we ended up with this issue.

      Any help will be appreciated!

      NOTE: Resolved, after adding 'implements Serializable' to the main Test class as shown below. Without it, it will throw an exception. I'll leave the original contents for reference.

      // code placeholder
      public class ReferenceTest implements Serializable {
        @Rule
        public final transient TestPipeline pipe1 = TestPipeline.create();
        @Rule
        public final transient TestPipeline pipe2 = TestPipeline.create();
      
        public static class String2KV extends DoFn<String, KV<String, String>> {
          @ProcessElement
          public void processElement(ProcessContext c) {
            // "key1:value1" -> ["key1", "value1"]
            String[] tokens = c.element().split(":");
            c.output(KV.of(tokens[0], tokens[1]));
          }
        }
      
        public static class MergeDoFn extends DoFn<KV<String, CoGbkResult>, String> {
          final TupleTag<String> inTag1;
          final TupleTag<String> inTag2;
          final TupleTag<String> outTag2;
      
          public MergeDoFn(TupleTag<String> inTag1, TupleTag<String> inTag2, TupleTag<String> outTag2) {
            this.inTag1 = inTag1;
            this.inTag2 = inTag2;
            this.outTag2 = outTag2;
          }
      
          @ProcessElement
          public void processElement(ProcessContext c) {
            String val1 = c.element().getValue().getOnly(inTag1);
            String val2 = c.element().getValue().getOnly(inTag2);
      
            // outTag1 = main output
            // outTag2 = side output
            c.output(outTag2, val1 + "," + val2);
          }
        }
      
        @Test
        public void testWorking() {
          // Create two PCs for test.
          PCollection<String> pc1 =
              pipe1.apply("create pc1", Create.<String>of("key1:value1").withCoder(StringUtf8Coder.of()));
          PCollection<KV<String, String>> pc2 =
              pipe1.apply("create pc2", Create.<KV<String, String>>of(KV.of("key1", "key1:value2"))
                  .withCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())));
      
          // Sanity check.
          PAssert.that(pc1).containsInAnyOrder("key1:value1");
          PAssert.that(pc2).containsInAnyOrder(KV.of("key1", "key1:value2"));
      
          pipe1.run();
        }
      
        // Disabled as of 2018-07-13.
        // https://stackoverflow.com/questions/51334429/beam-java-sdk-2-4-2-5-passert-with-cogroupbykey
        @Test
        public void testBroken() {
          // Create two PCs for test.
          PCollection<String> pc1 =
              pipe2.apply("create pc1", Create.<String>of("key1:value1").withCoder(StringUtf8Coder.of()));
          PCollection<KV<String, String>> pc2 =
              pipe2.apply("create pc2", Create.<KV<String, String>>of(KV.of("key1", "value2"))
                  .withCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())));
      
          // Sanity check.
          PAssert.that(pc1).containsInAnyOrder("key1:value1");
          PAssert.that(pc2).containsInAnyOrder(KV.of("key1", "value2"));
      
          TupleTag<String> inTag1 = new TupleTag<String>() {
            private static final long serialVersionUID = 1L;
          };
          TupleTag<String> inTag2 = new TupleTag<String>() {
            private static final long serialVersionUID = 1L;
          };
          TupleTag<String> outTag1 = new TupleTag<String>() {
            private static final long serialVersionUID = 1L;
          };
          TupleTag<String> outTag2 = new TupleTag<String>() {
            private static final long serialVersionUID = 1L;
          };
      
          // The following four lines causes an issue.
          PCollectionTuple tuple =
              KeyedPCollectionTuple.of(inTag1, pc1.apply("String to KV<String, String>", ParDo.of(new String2KV())))
                  .and(inTag2, pc2).apply("CoGroupByKey", CoGroupByKey.<String>create()).apply("Some Merge DoFn",
                      ParDo.of(new MergeDoFn(inTag1, inTag2, outTag2)).withOutputTags(outTag1, TupleTagList.of(outTag2)));
      
          PAssert.that(tuple.get(outTag1)).empty();
          PAssert.that(tuple.get(outTag2)).containsInAnyOrder("value1,value2");
      
          pipe2.run();
        }
      }
      

      Here's the error:
       

      // code placeholder
      java.lang.IllegalArgumentException: unable to serialize org.apache.beam.sdk.transforms.join.CoGbkResult$CoGbkResultCoder@217aa4f
      
          at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:57)
          at org.apache.beam.runners.direct.repackaged.runners.core.construction.CoderTranslation.toCustomCoder(CoderTranslation.java:121)
          at org.apache.beam.runners.direct.repackaged.runners.core.construction.CoderTranslation.toProto(CoderTranslation.java:85)
          at org.apache.beam.runners.direct.repackaged.runners.core.construction.SdkComponents.registerCoder(SdkComponents.java:183)
          at org.apache.beam.runners.direct.repackaged.runners.core.construction.CoderTranslation.registerComponents(CoderTranslation.java:107)
          at org.apache.beam.runners.direct.repackaged.runners.core.construction.CoderTranslation.toKnownCoder(CoderTranslation.java:91)
          at org.apache.beam.runners.direct.repackaged.runners.core.construction.CoderTranslation.toProto(CoderTranslation.java:83)
          at org.apache.beam.runners.direct.repackaged.runners.core.construction.SdkComponents.registerCoder(SdkComponents.java:183)
          at org.apache.beam.runners.direct.repackaged.runners.core.construction.PCollectionTranslation.toProto(PCollectionTranslation.java:36)
          at org.apache.beam.runners.direct.repackaged.runners.core.construction.SdkComponents.registerPCollection(SdkComponents.java:138)
          at org.apache.beam.runners.direct.repackaged.runners.core.construction.PTransformTranslation.toProto(PTransformTranslation.java:173)
          at org.apache.beam.runners.direct.repackaged.runners.core.construction.ParDoTranslation.getParDoPayload(ParDoTranslation.java:515)
          at org.apache.beam.runners.direct.repackaged.runners.core.construction.ParDoTranslation.isSplittable(ParDoTranslation.java:525)
          at org.apache.beam.runners.direct.repackaged.runners.core.construction.PTransformMatchers$4.matches(PTransformMatchers.java:194)
          at org.apache.beam.sdk.Pipeline$2.visitPrimitiveTransform(Pipeline.java:278)
          at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:668)
          at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
          at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
          at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
          at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311)
          at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
          at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
          at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:256)
          at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:209)
          at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:173)
          at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:62)
          at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
          at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:353)
          at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:335)
          at exp.moloco.dataflow2.ReferenceTest.testBroken(ReferenceTest.java:110)
          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
          at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
          at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
          at java.lang.reflect.Method.invoke(Method.java:498)
          at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
          at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
          at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
          at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
          at org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:324)
          at org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:324)
          at org.junit.rules.RunRules.evaluate(RunRules.java:20)
          at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
          at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
          at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
          at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
          at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
          at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
          at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
          at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
          at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
          at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
          at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
          at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
          at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
          at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
      Caused by: java.io.NotSerializableException: exp.moloco.dataflow2.ReferenceTest
          at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
          at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
          at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
          at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
          at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
          at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
          at java.util.HashMap.internalWriteEntries(HashMap.java:1789)
          at java.util.HashMap.writeObject(HashMap.java:1363)
          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
          at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
          at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
          at java.lang.reflect.Method.invoke(Method.java:498)
          at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1128)
          at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
          at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
          at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
          at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
          at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
          at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
          at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
          at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
          at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
          at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
          at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
          at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
          at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53)
          ... 54 more
      
      
      Process finished with exit code 255
      

       

      Attachments

        Activity

          People

            jaku Jason Kuster
            hadenlee haden lee
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: