Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-3423

Distinct.withRepresentativeValueFn throws CoderException "cannot encode null KV"

Details

    • Bug
    • Status: Resolved
    • P2
    • Resolution: Fixed
    • 2.2.0, 2.3.0
    • 2.4.0
    • sdk-java-core
    • ubuntu16.04, idea, java8

    Description

      My code as follow:

      pipeline
                      //Read data
                      .apply("Read from kafka",
                              KafkaIO.<String, String>read()
                                      .withBootstrapServers("localhost:9092")
                                      .withTopic(topic)
                                      .withKeyDeserializer(StringDeserializer.class)
                                      .withValueDeserializer(StringDeserializer.class)
                                      .withoutMetadata()
                      )
                      .apply(Window.<KV<String, String>>into(FixedWindows.of(Duration.standardSeconds(10)))
                              .triggering(AfterWatermark.pastEndOfWindow()
                                      .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5))))
                              .discardingFiredPanes().withAllowedLateness(Duration.ZERO))
                      //works fine
      //                .apply(Distinct.create())
                      //ops! -> CoderException: cannot encode a null KV
                      .apply(Distinct.withRepresentativeValueFn(new Val()).withRepresentativeType(TypeDescriptors.strings()))
                      .apply(MapElements.into(TypeDescriptors.nulls())
                              .via(input -> {
                                  System.out.println(Instant.now());
                                  System.out.println(input);
                                  return null;
                              }));
      
          private static class Val implements SerializableFunction<KV<String, String>, String> {
              @Override
              public String apply(KV<String, String> input) {
                  return input.getValue();
              }
          }
      

      Input words to Kafka:
      word1
      //after 10s
      word2

      Then got exceptions as follow:

      begin
      2018-01-06T11:18:52.971Z
      KV{null, a}
      Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.RuntimeException: org.apache.beam.sdk.coders.CoderException: cannot encode a null KV
      	at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:344)
      	at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:314)
      	at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:208)
      	at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:62)
      	at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
      	at org.apache.beam.sdk.Pipeline.run(Pipeline.java:289)
      	at com.xiaomi.huyu.processor.dev.EntryPoint.main(EntryPoint.java:37)
      Caused by: java.lang.RuntimeException: org.apache.beam.sdk.coders.CoderException: cannot encode a null KV
      	at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:113)
      	at org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:235)
      	at org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:211)
      	at org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:66)
      	at org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:436)
      	at org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:424)
      	at org.apache.beam.sdk.transforms.Combine$GroupedValues$1.processElement(Combine.java:2149)
      Caused by: org.apache.beam.sdk.coders.CoderException: cannot encode a null KV
      	at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:70)
      	at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
      	at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:73)
      	at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
      	at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:93)
      	at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:77)
      	at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:62)
      	at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:106)
      	at org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:44)
      	at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:111)
      	at org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:235)
      	at org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:211)
      	at org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:66)
      	at org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:436)
      	at org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:424)
      	at org.apache.beam.sdk.transforms.Combine$GroupedValues$1.processElement(Combine.java:2149)
      	at org.apache.beam.sdk.transforms.Combine$GroupedValues$1$DoFnInvoker.invokeProcessElement(Unknown Source)
      	at org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177)
      	at org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:141)
      	at org.apache.beam.runners.direct.repackaged.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:70)
      	at org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:182)
      	at org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:51)
      	at org.apache.beam.runners.direct.TransformExecutor.processElements(TransformExecutor.java:146)
      	at org.apache.beam.runners.direct.TransformExecutor.run(TransformExecutor.java:110)
      	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      	at java.lang.Thread.run(Thread.java:748)
      
      

      But if I use .apply(Distinct.create()) , it works fine.

      Attachments

        Issue Links

          Activity

            People

              kenn Kenneth Knowles
              huangjianhuang huangjianhuang
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: