Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-22966

NPE in StateAssignmentOperation when rescaling

    XMLWordPrintableJSON

Details

    Description

      Reported on user ML.

      From the code, it looks like if an operator doesn't have at least one subtask with state , then some variables in StateAssignmentOperation.reAssignSubKeyedStates can be null:

      subManagedKeyedState.isEmpty() && subRawKeyedState.isEmpty()
      
          2021-06-09 13:08:59,849 WARN  org.apache.flink.client.deployment.application.DetachedApplicationRunner [] - Could not execute application:
          org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Failed to execute job '<censored>'.
                  at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
                  at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
                  at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
                  at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
                  at org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
                  at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:102) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
                  at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) [?:?]
                  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
                  at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
                  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]
                  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
                  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
                  at java.lang.Thread.run(Thread.java:834) [?:?]
          Caused by: org.apache.flink.util.FlinkException: Failed to execute job '<censored>'.
                  at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1970) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
                  at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
                  at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
                  at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1834) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
                  at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:801) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
                  at <censored> ~[?:?]
                  at <censored> ~[?:?]
                  at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
                  at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:?]
                  at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
                  at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
                  at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
                  ... 12 more
          Caused by: java.lang.RuntimeException: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
                  at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
                  at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
                  at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) ~[?:?]
                  at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) ~[?:?]
                  ... 1 more
          Caused by: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
                  at org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
                  at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
                  at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
                  at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
                  at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1705) ~[?:?]
                  ... 6 more
          Caused by: java.util.concurrent.CompletionException: java.lang.NullPointerException
                  at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314) ~[?:?]
                  at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319) ~[?:?]
                  at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702) ~[?:?]
                  ... 6 more
          Caused by: java.lang.NullPointerException
                  at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.reAssignSubKeyedStates(StateAssignmentOperation.java:300) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
                  at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.lambda$reDistributeKeyedStates$0(StateAssignmentOperation.java:260) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
                  at java.util.HashMap.forEach(HashMap.java:1336) ~[?:?]
                  at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.reDistributeKeyedStates(StateAssignmentOperation.java:252) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
                  at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignAttemptState(StateAssignmentOperation.java:196) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
                  at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:139) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
                  at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1562) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
                  at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1642) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
                  at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.tryRestoreExecutionGraphFromSavepoint(DefaultExecutionGraphFactory.java:163) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
                  at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:138) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
                  at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:342) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
                  at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:190) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
                  at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:120) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
                  at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:132) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
                  at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:110) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
                  at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:340) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
                  at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:317) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
                  at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:107) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
                  at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
                  at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
                  ... 7 more
          2021-06-09 13:08:59,852 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler   [] - Exception occurred in REST handler: Could not execute application.
      

      Attachments

        Issue Links

          Activity

            People

              akalashnikov Anton Kalashnikov
              roman Roman Khachatryan
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: