Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-30158

[Flink SQL][Protobuf] NullPointerException when querying Kafka topic using repeated or map attributes

    XMLWordPrintableJSON

Details

    Description

      I am encountering a java.lang.NullPointerException exception when trying to use Flink SQL to query a kafka topic that uses either repeated and/or map attributes.

       

      {}Replication steps

      1. Use a protobuf definition that either uses repeated and/or map.  This protobuf schema should cover a few of the problematic scenarios I ran into:

       

      syntax = "proto3";
      package example.message;
      
      
      option java_package = "com.example.message";
      option java_multiple_files = true;
      
      message NestedType {
        int64 nested_first = 1;
        oneof nested_second {
          int64 one_of_first = 2;
          string one_of_second = 3;
        }
      }
      
      message Test {
        repeated int64 first = 1;
        map<string, NestedType> second = 2;
      } 

      2. Attempt query on topic, even excluding problematic columns:

       

      [ERROR] Could not execute SQL statement. Reason:
      org.apache.flink.formats.protobuf.PbCodegenException: java.lang.NullPointerException

       

       

      log file:

       

      2022-11-22 15:33:59,510 WARN  org.apache.flink.table.client.cli.CliClient                  [] - Could not execute SQL statement.org.apache.flink.table.client.gateway.SqlExecutionException: Error while retrieving result.    at org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:79) ~[flink-sql-client-1.16.0.jar:1.16.0]Caused by: java.lang.RuntimeException: Failed to fetch next result    at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109) ~[flink-dist-1.16.0.jar:1.16.0]    at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) ~[flink-dist-1.16.0.jar:1.16.0]    at org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222) ~[?:?]    at org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:75) ~[flink-sql-client-1.16.0.jar:1.16.0]Caused by: java.io.IOException: Failed to fetch job execution result    at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:184) ~[flink-dist-1.16.0.jar:1.16.0]    at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:121) ~[flink-dist-1.16.0.jar:1.16.0]    at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106) ~[flink-dist-1.16.0.jar:1.16.0]    at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) ~[flink-dist-1.16.0.jar:1.16.0]    at org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222) ~[?:?]    at org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:75) ~[flink-sql-client-1.16.0.jar:1.16.0]Caused by: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: bc869097009a92d0601add881a6b920c)    at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) ~[?:?]    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2022) ~[?:?]    at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:182) ~[flink-dist-1.16.0.jar:1.16.0]    at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:121) ~[flink-dist-1.16.0.jar:1.16.0]    at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106) ~[flink-dist-1.16.0.jar:1.16.0]    at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) ~[flink-dist-1.16.0.jar:1.16.0]    at org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222) ~[?:?]    at org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:75) ~[flink-sql-client-1.16.0.jar:1.16.0]Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: bc869097009a92d0601add881a6b920c)    at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:130) ~[flink-dist-1.16.0.jar:1.16.0]    at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) ~[?:?]    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]    at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[?:?]    at org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$6(FutureUtils.java:301) ~[flink-dist-1.16.0.jar:1.16.0]    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]    at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[?:?]    at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$31(RestClusterClient.java:772) ~[flink-dist-1.16.0.jar:1.16.0]    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]    at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[?:?]    at org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$6(FutureUtils.java:301) ~[flink-dist-1.16.0.jar:1.16.0]    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]    at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610) ~[?:?]    at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1085) ~[?:?]    at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) ~[?:?]    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]    at java.lang.Thread.run(Thread.java:829) ~[?:?]Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.    at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) ~[flink-dist-1.16.0.jar:1.16.0]    at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:128) ~[flink-dist-1.16.0.jar:1.16.0]    at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) ~[?:?]    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]    at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[?:?]    at org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$6(FutureUtils.java:301) ~[flink-dist-1.16.0.jar:1.16.0]    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]    at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[?:?]    at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$31(RestClusterClient.java:772) ~[flink-dist-1.16.0.jar:1.16.0]    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]    at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[?:?]    at org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$6(FutureUtils.java:301) ~[flink-dist-1.16.0.jar:1.16.0]    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]    at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610) ~[?:?]    at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1085) ~[?:?]    at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) ~[?:?]    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]    at java.lang.Thread.run(Thread.java:829) ~[?:?]Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139) ~[flink-dist-1.16.0.jar:1.16.0]    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83) ~[flink-dist-1.16.0.jar:1.16.0]    at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:256) ~[flink-dist-1.16.0.jar:1.16.0]    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:247) ~[flink-dist-1.16.0.jar:1.16.0]    at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:240) ~[flink-dist-1.16.0.jar:1.16.0]    at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:738) ~[flink-dist-1.16.0.jar:1.16.0]    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:715) ~[flink-dist-1.16.0.jar:1.16.0]    at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:78) ~[flink-dist-1.16.0.jar:1.16.0]    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:477) ~[flink-dist-1.16.0.jar:1.16.0]    at jdk.internal.reflect.GeneratedMethodAccessor79.invoke(Unknown Source) ~[?:?]    at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]    at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309) ~[?:?]    at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) ~[?:?]    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307) ~[?:?]    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222) ~[?:?]    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84) ~[?:?]    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168) ~[?:?]    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) ~[?:?]    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) ~[?:?]    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) ~[flink-scala_2.12-1.16.0.jar:1.16.0]    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) ~[flink-scala_2.12-1.16.0.jar:1.16.0]    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) ~[?:?]    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-scala_2.12-1.16.0.jar:1.16.0]    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[flink-scala_2.12-1.16.0.jar:1.16.0]    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[flink-scala_2.12-1.16.0.jar:1.16.0]    at akka.actor.Actor.aroundReceive(Actor.scala:537) ~[?:?]    at akka.actor.Actor.aroundReceive$(Actor.scala:535) ~[?:?]    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) ~[?:?]    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) ~[?:?]    at akka.actor.ActorCell.invoke(ActorCell.scala:548) ~[?:?]    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) ~[?:?]    at akka.dispatch.Mailbox.run(Mailbox.scala:231) ~[?:?]    at akka.dispatch.Mailbox.exec(Mailbox.scala:243) ~[?:?]    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) ~[?:?]    at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020) ~[?:?]    at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) ~[?:?]    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) ~[?:?]    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183) ~[?:?]Caused by: org.apache.flink.formats.protobuf.PbCodegenException: java.lang.NullPointerException    at org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter.<init>(ProtoToRowConverter.java:126) ~[flink-sql-protobuf-1.16.0.jar:1.16.0]    at org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.open(PbRowDataDeserializationSchema.java:64) ~[flink-sql-protobuf-1.16.0.jar:1.16.0]    at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.open(DynamicKafkaDeserializationSchema.java:94) ~[?:?]    at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.open(KafkaDeserializationSchemaWrapper.java:47) ~[?:?]    at org.apache.flink.connector.kafka.source.KafkaSource.createReader(KafkaSource.java:144) ~[?:?]    at org.apache.flink.connector.kafka.source.KafkaSource.createReader(KafkaSource.java:135) ~[?:?]    at org.apache.flink.streaming.api.operators.SourceOperator.initReader(SourceOperator.java:286) ~[flink-dist-1.16.0.jar:1.16.0]    at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.init(SourceOperatorStreamTask.java:94) ~[flink-dist-1.16.0.jar:1.16.0]    at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:692) ~[flink-dist-1.16.0.jar:1.16.0]    at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669) ~[flink-dist-1.16.0.jar:1.16.0]    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) ~[flink-dist-1.16.0.jar:1.16.0]    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904) ~[flink-dist-1.16.0.jar:1.16.0]    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) ~[flink-dist-1.16.0.jar:1.16.0]    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) ~[flink-dist-1.16.0.jar:1.16.0]    at java.lang.Thread.run(Thread.java:829) ~[?:?]Caused by: java.lang.NullPointerException    at org.apache.flink.formats.protobuf.deserialize.PbCodegenRowDeserializer.pbGetMessageElementCode(PbCodegenRowDeserializer.java:106) ~[flink-sql-protobuf-1.16.0.jar:1.16.0]    at org.apache.flink.formats.protobuf.deserialize.PbCodegenRowDeserializer.codegen(PbCodegenRowDeserializer.java:84) ~[flink-sql-protobuf-1.16.0.jar:1.16.0]    at org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter.<init>(ProtoToRowConverter.java:109) ~[flink-sql-protobuf-1.16.0.jar:1.16.0]    at org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.open(PbRowDataDeserializationSchema.java:64) ~[flink-sql-protobuf-1.16.0.jar:1.16.0]    at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.open(DynamicKafkaDeserializationSchema.java:94) ~[?:?]    at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.open(KafkaDeserializationSchemaWrapper.java:47) ~[?:?]    at org.apache.flink.connector.kafka.source.KafkaSource.createReader(KafkaSource.java:144) ~[?:?]    at org.apache.flink.connector.kafka.source.KafkaSource.createReader(KafkaSource.java:135) ~[?:?]    at org.apache.flink.streaming.api.operators.SourceOperator.initReader(SourceOperator.java:286) ~[flink-dist-1.16.0.jar:1.16.0]    at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.init(SourceOperatorStreamTask.java:94) ~[flink-dist-1.16.0.jar:1.16.0]    at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:692) ~[flink-dist-1.16.0.jar:1.16.0]    at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669) ~[flink-dist-1.16.0.jar:1.16.0]    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) ~[flink-dist-1.16.0.jar:1.16.0]    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904) ~[flink-dist-1.16.0.jar:1.16.0]    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) ~[flink-dist-1.16.0.jar:1.16.0]    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) ~[flink-dist-1.16.0.jar:1.16.0]    at java.lang.Thread.run(Thread.java:829) ~[?:?]

       

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            jamesmcguirepro James Mcguire
            Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

              Created:
              Updated: