Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-35639

upgrade to 1.19 with job in HA state with restart strategy crashes job manager

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Not A Problem
    • 1.20.0, 1.19.1
    • None
    • API / Core

    Description

      When trying to upgrade a flink cluster from 1.18 to 1.19, with a 1.18 job in zookeeper HA state, I have a jobmanager crash with a ClassCastException, see log below  

       

      2024-06-18 16:58:14,401 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Fatal error occurred in the cluster entrypoint. org.apache.flink.util.FlinkException: JobMaster for job 5f0898c964a93a47aa480427f3e2c6c0 failed.     at org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:1484) ~[flink-dist-1.19.0.jar:1.19.0]     at org.apache.flink.runtime.dispatcher.Dispatcher.jobManagerRunnerFailed(Dispatcher.java:775) ~[flink-dist-1.19.0.jar:1.19.0]     at org.apache.flink.runtime.dispatcher.Dispatcher.handleJobManagerRunnerResult(Dispatcher.java:738) ~[flink-dist-1.19.0.jar:1.19.0]     at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$7(Dispatcher.java:693) ~[flink-dist-1.19.0.jar:1.19.0]     at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:934) ~[?:?]     at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:911) ~[?:?]     at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482) ~[?:?]     at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRunAsync$4(PekkoRpcActor.java:451) ~[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]     at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-dist-1.19.0.jar:1.19.0]     at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRunAsync(PekkoRpcActor.java:451) ~[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]     at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:218) ~[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]     at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85) ~[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]     at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168) ~[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]     at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]     at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]     at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]     at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]     at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]     at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547) [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]     at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545) [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]     at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229) [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]     at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590) [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]     at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557) [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]     at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280) [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]     at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241) [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]     at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253) [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]     at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373) [?:?]     at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182) [?:?]     at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655) [?:?]     at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622) [?:?]     at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165) [?:?] Caused by: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.     at org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97) ~[flink-dist-1.19.0.jar:1.19.0]     at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[?:?]     at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841) ~[?:?]     at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?]     at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1773) ~[?:?]     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]     at java.lang.Thread.run(Thread.java:840) ~[?:?] Caused by: java.util.concurrent.CompletionException: java.lang.ClassCastException: cannot assign instance of org.apache.flink.api.common.time.Time to field org.apache.flink.api.common.restartstrategy.RestartStrategies$FixedDelayRestartStrategyConfiguration.delayBetweenAttemptsInterval of type java.time.Duration in instance of org.apache.flink.api.common.restartstrategy.RestartStrategies$FixedDelayRestartStrategyConfiguration     at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315) ~[?:?]     at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320) ~[?:?]     at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1770) ~[?:?]     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]     at java.lang.Thread.run(Thread.java:840) ~[?:?] Caused by: java.lang.ClassCastException: cannot assign instance of org.apache.flink.api.common.time.Time to field org.apache.flink.api.common.restartstrategy.RestartStrategies$FixedDelayRestartStrategyConfiguration.delayBetweenAttemptsInterval of type java.time.Duration in instance of org.apache.flink.api.common.restartstrategy.RestartStrategies$FixedDelayRestartStrategyConfiguration     at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2096) ~[?:?]     at java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(ObjectStreamClass.java:2060) ~[?:?]     at java.io.ObjectStreamClass.checkObjFieldValueTypes(ObjectStreamClass.java:1347) ~[?:?]     at java.io.ObjectInputStream$FieldValues.defaultCheckFieldValues(ObjectInputStream.java:2679) ~[?:?]     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2486) ~[?:?]     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257) ~[?:?]     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733) ~[?:?]     at java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2606) ~[?:?]     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2457) ~[?:?]     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257) ~[?:?]     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733) ~[?:?]     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:509) ~[?:?]     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:467) ~[?:?]     at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:539) ~[flink-dist-1.19.0.jar:1.19.0]     at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:527) ~[flink-dist-1.19.0.jar:1.19.0]     at org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:67) ~[flink-dist-1.19.0.jar:1.19.0]     at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:102) ~[flink-dist-1.19.0.jar:1.19.0]     at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:121) ~[flink-dist-1.19.0.jar:1.19.0]     at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:379) ~[flink-dist-1.19.0.jar:1.19.0]     at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:356) ~[flink-dist-1.19.0.jar:1.19.0]     at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:128) ~[flink-dist-1.19.0.jar:1.19.0]     at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:100) ~[flink-dist-1.19.0.jar:1.19.0]     at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112) ~[flink-dist-1.19.0.jar:1.19.0]     at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768) ~[?:?]     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]     at java.lang.Thread.run(Thread.java:840) ~[?:?] 2024-06-18 16:58:14,403 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Shutting StandaloneSessionClusterEntrypoint down with application status UNKNOWN. Diagnostics Cluster entrypoint has been closed externally.. 2024-06-18 16:58:14,404 INFO  org.apache.flink.runtime.blob.BlobServer                     [] - Stopped BLOB server at 127.0.0.1:40067 2024-06-18 16:58:14,431 INFO  org.apache.flink.shaded.curator5.org.apache.curator.utils.Compatibility [] - Using org.apache.zookeeper.server.quorum.MultipleAddresses
      

      Reproducing

      Download 1.18 and 1.19 binary releases.

      Add the following to flink-1.19.0/conf/config.yaml and flink-1.18.1/conf/flink-conf.yaml  

       

      high-availability: zookeeper
      high-availability.zookeeper.quorum: localhost
      high-availability.storageDir: file:///tmp/flink/recovery 

      Launch zookeeper:

      docker run --network host zookeeper:latest

      launch 1.18 job manager:

      ./flink-1.18.1/bin/jobmanager.sh start-foreground

      launch 1.18 task manager:

      ./flink-1.18.1/bin/taskmanager.sh start-foreground

      create the following job:

      import org.apache.flink.api.java.ExecutionEnvironment;
      import org.apache.flink.api.java.tuple.Tuple2;
      import org.apache.flink.api.common.functions.FlatMapFunction;
      import org.apache.flink.util.Collector;
      import org.apache.flink.api.common.restartstrategy.RestartStrategies;
      import org.apache.flink.api.common.time.Time;
      import java.util.concurrent.TimeUnit;public class FlinkJob {
          public static void main(String[] args) throws Exception {
              final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();        env.setRestartStrategy(
                      RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, Time.of(20, TimeUnit.SECONDS))
                      );        env.fromElements("Hello World", "Hello Flink")
                  .flatMap(new LineSplitter())
                  .groupBy(0)
                  .sum(1)
                  .print();
          }    public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
              @Override
              public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
                  for (String word : value.split(" ")) {
                      try {
                          Thread.sleep(120000);
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                      out.collect(new Tuple2<>(word, 1));
                  }
              }
          }}
       

      pom.xml

      <project xmlns="http://maven.apache.org/POM/4.0.0"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
          <modelVersion>4.0.0</modelVersion>
          <groupId>org.apache.flink</groupId>
          <artifactId>myflinkjob</artifactId>
          <version>1.0-SNAPSHOT</version>
      
          <properties>
              <flink.version>1.18.1</flink.version>
              <java.version>1.8</java.version>
          </properties>
      
          <dependencies>
              <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-java</artifactId>
                  <version>${flink.version}</version>
              </dependency>
              <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-streaming-java</artifactId>
                  <version>${flink.version}</version>
              </dependency>
          </dependencies>
      
          <build>
              <plugins>
                  <plugin>
                      <groupId>org.apache.maven.plugins</groupId>
                      <artifactId>maven-compiler-plugin</artifactId>
                      <version>3.8.1</version>
                      <configuration>
                          <source>${java.version}</source>
                          <target>${java.version}</target>
                      </configuration>
                  </plugin>
                  <plugin>
                      <groupId>org.apache.maven.plugins</groupId>
                      <artifactId>maven-jar-plugin</artifactId>
                      <version>3.1.0</version>
                      <configuration>
                          <archive>
                              <manifest>
                                  <addClasspath>true</addClasspath>
                                  <classpathPrefix>lib/</classpathPrefix>
                                  <mainClass>FlinkJob</mainClass>
                              </manifest>
                          </archive>
                      </configuration>
                  </plugin>
              </plugins>
          </build>
      </project>
      

      Launch job:

      ./flink-1.18.1/bin/flink run ../flink-job/target/myflinkjob-1.0-SNAPSHOT.jar
      Job has been submitted with JobID 5f0898c964a93a47aa480427f3e2c6c0

      Kill job manager and task manager.

      Then launch job manager 1.19.0

      ./flink-1.19.0/bin/jobmanager.sh start-foreground

      job manager will crash with stack trace above.

      Root cause

      It looks like the type of delayBetweenAttemptsInterval was changed in 1.19 https://github.com/apache/flink/pull/22984/files#diff-d174f32ffdea69de610c4f37c545bd22a253b9846434f83397f1bbc2aaa399faR239 , introducing an incompatibility which is not handled by flink 1.19.

      In my opinion, job-maanger should not crash when starting in that case.

       

      Attachments

        Issue Links

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            mapohl Matthias Pohl
            yazgoo yazgoo
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment