Details
-
Bug
-
Status: Resolved
-
P2
-
Resolution: Fixed
-
2.2.0, 2.3.0
-
ubuntu16.04, idea, java8
Description
My code as follow:
pipeline //Read data .apply("Read from kafka", KafkaIO.<String, String>read() .withBootstrapServers("localhost:9092") .withTopic(topic) .withKeyDeserializer(StringDeserializer.class) .withValueDeserializer(StringDeserializer.class) .withoutMetadata() ) .apply(Window.<KV<String, String>>into(FixedWindows.of(Duration.standardSeconds(10))) .triggering(AfterWatermark.pastEndOfWindow() .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5)))) .discardingFiredPanes().withAllowedLateness(Duration.ZERO)) //works fine // .apply(Distinct.create()) //ops! -> CoderException: cannot encode a null KV .apply(Distinct.withRepresentativeValueFn(new Val()).withRepresentativeType(TypeDescriptors.strings())) .apply(MapElements.into(TypeDescriptors.nulls()) .via(input -> { System.out.println(Instant.now()); System.out.println(input); return null; })); private static class Val implements SerializableFunction<KV<String, String>, String> { @Override public String apply(KV<String, String> input) { return input.getValue(); } }
Input words to Kafka:
word1
//after 10s
word2
Then got exceptions as follow:
begin 2018-01-06T11:18:52.971Z KV{null, a} Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.RuntimeException: org.apache.beam.sdk.coders.CoderException: cannot encode a null KV at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:344) at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:314) at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:208) at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:62) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:289) at com.xiaomi.huyu.processor.dev.EntryPoint.main(EntryPoint.java:37) Caused by: java.lang.RuntimeException: org.apache.beam.sdk.coders.CoderException: cannot encode a null KV at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:113) at org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:235) at org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:211) at org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:66) at org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:436) at org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:424) at org.apache.beam.sdk.transforms.Combine$GroupedValues$1.processElement(Combine.java:2149) Caused by: org.apache.beam.sdk.coders.CoderException: cannot encode a null KV at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:70) at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36) at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:73) at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36) at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:93) at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:77) at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:62) at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:106) at org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:44) at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:111) at org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:235) at org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:211) at org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:66) at org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:436) at org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:424) at org.apache.beam.sdk.transforms.Combine$GroupedValues$1.processElement(Combine.java:2149) at org.apache.beam.sdk.transforms.Combine$GroupedValues$1$DoFnInvoker.invokeProcessElement(Unknown Source) at org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177) at org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:141) at org.apache.beam.runners.direct.repackaged.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:70) at org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:182) at org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:51) at org.apache.beam.runners.direct.TransformExecutor.processElements(TransformExecutor.java:146) at org.apache.beam.runners.direct.TransformExecutor.run(TransformExecutor.java:110) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748)
But if I use .apply(Distinct.create()) , it works fine.
Attachments
Issue Links
- links to