Description
This exception is thrown when a kafka table is created because.
Exception in thread "main" java.lang.NullPointerException
at org.apache.beam.sdk.io.kafka.KafkaIO.updateKafkaProperties(KafkaIO.java:1028)
at org.apache.beam.sdk.io.kafka.KafkaIO.access$900(KafkaIO.java:251)
at org.apache.beam.sdk.io.kafka.KafkaIO$Read.withConsumerConfigUpdates(KafkaIO.java:814)
at org.apache.beam.sdk.extensions.sql.meta.provider.kafka.BeamKafkaTable.buildIOReader(BeamKafkaTable.java:89)
at org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSourceRel$Transform.expand(BeamIOSourceRel.java:67)
at org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSourceRel$Transform.expand(BeamIOSourceRel.java:58)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:488)
at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:66)
at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.lambda$buildPCollectionList$0(BeamSqlRelUtils.java:47)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.Iterator.forEachRemaining(Iterator.java:116)
at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.buildPCollectionList(BeamSqlRelUtils.java:48)
at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:64)
at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.lambda$buildPCollectionList$0(BeamSqlRelUtils.java:47)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.Iterator.forEachRemaining(Iterator.java:116)
at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.buildPCollectionList(BeamSqlRelUtils.java:48)
at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:64)
at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:36)
at org.apache.beam.sdk.extensions.sql.example.MyKafkaExample.main(MyKafkaExample.java:76)
This happens because in org.apache.beam.sdk.extensions.sql.meta.provider.kafka, configupdates is not initialized anywhere and the method updateConsumerProperties is never called.
Attachments
Issue Links
- links to