Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Not A Problem
-
1.20.0, 1.19.1
-
None
-
Download 1.18 and 1.19 binary releases. Add the following to flink-1.19.0/conf/config.yaml and flink-1.18.1/conf/flink-conf.yaml ```yaml high-availability: zookeeper high-availability.zookeeper.quorum: localhost high-availability.storageDir: file:///tmp/flink/recovery ``` Launch zookeeper: docker run --network host zookeeper:latest launch 1.18 task manager: ./flink-1.18.1/bin/taskmanager.sh start-foreground launch 1.18 job manager: ./flink-1.18.1/bin/jobmanager.sh start-foreground launch the following job: ```java import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.util.Collector; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.time.Time; import java.util.concurrent.TimeUnit; public class FlinkJob { public static void main(String[] args) throws Exception
{ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.setRestartStrategy( RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, Time.of(20, TimeUnit.SECONDS)) ); env.fromElements("Hello World", "Hello Flink") .flatMap(new LineSplitter()) .groupBy(0) .sum(1) .print(); }public static final class LineSplitter implements FlatMapFunction> { @Override public void flatMap(String value, Collector> out) { for (String word : value.split(" ")) { try
{ Thread.sleep(120000); }catch (InterruptedException e) { e.printStackTrace(); } out.collect(new Tuple2<>(word, 1)); } } } } ``` ```xml 4.0.0 org.apache.flink myflinkjob 1.0-SNAPSHOT 1.18.1 1.8 org.apache.flink flink-java ${flink.version} org.apache.flink flink-streaming-java ${flink.version} org.apache.maven.plugins maven-compiler-plugin 3.8.1 ${java.version} ${java.version} org.apache.maven.plugins maven-jar-plugin 3.1.0 true lib/ FlinkJob ``` Launch job: ./flink-1.18.1/bin/flink run ../flink-job/target/myflinkjob-1.0-SNAPSHOT.jar Job has been submitted with JobID 5f0898c964a93a47aa480427f3e2c6c0 Kill job manager and task manager. Then launch job manager 1.19.0 ./flink-1.19.0/bin/jobmanager.sh start-foreground Root cause ========== It looks like the type of delayBetweenAttemptsInterval was changed in 1.19 https://github.com/apache/flink/pull/22984/files#diff-d174f32ffdea69de610c4f37c545bd22a253b9846434f83397f1bbc2aaa399faR239 , introducing an incompatibility which is not handled by flink 1.19. In my opinion, job-maanger should not crash when starting in that case.
Download 1.18 and 1.19 binary releases. Add the following to flink-1.19.0/conf/config.yaml and flink-1.18.1/conf/flink-conf.yaml ```yaml high-availability: zookeeper high-availability.zookeeper.quorum: localhost high-availability.storageDir: file:///tmp/flink/recovery ``` Launch zookeeper: docker run --network host zookeeper:latest launch 1.18 task manager: ./flink-1.18.1/bin/taskmanager.sh start-foreground launch 1.18 job manager: ./flink-1.18.1/bin/jobmanager.sh start-foreground launch the following job: ```java import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.util.Collector; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.time.Time; import java.util.concurrent.TimeUnit; public class FlinkJob { public static void main(String[] args) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.setRestartStrategy( RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, Time.of(20, TimeUnit.SECONDS)) ); env.fromElements("Hello World", "Hello Flink") .flatMap(new LineSplitter()) .groupBy(0) .sum(1) .print(); } public static final class LineSplitter implements FlatMapFunction> { @Override public void flatMap(String value, Collector> out) { for (String word : value.split(" ")) { try { Thread.sleep(120000); } catch (InterruptedException e) { e.printStackTrace(); } out.collect(new Tuple2<>(word, 1)); } } } } ``` ```xml 4.0.0 org.apache.flink myflinkjob 1.0-SNAPSHOT 1.18.1 1.8 org.apache.flink flink-java ${flink.version} org.apache.flink flink-streaming-java ${flink.version} org.apache.maven.plugins maven-compiler-plugin 3.8.1 ${java.version} ${java.version} org.apache.maven.plugins maven-jar-plugin 3.1.0 true lib/ FlinkJob ``` Launch job: ./flink-1.18.1/bin/flink run ../flink-job/target/myflinkjob-1.0-SNAPSHOT.jar Job has been submitted with JobID 5f0898c964a93a47aa480427f3e2c6c0 Kill job manager and task manager. Then launch job manager 1.19.0 ./flink-1.19.0/bin/jobmanager.sh start-foreground Root cause ========== It looks like the type of delayBetweenAttemptsInterval was changed in 1.19 https://github.com/apache/flink/pull/22984/files#diff-d174f32ffdea69de610c4f37c545bd22a253b9846434f83397f1bbc2aaa399faR239 , introducing an incompatibility which is not handled by flink 1.19. In my opinion, job-maanger should not crash when starting in that case.
Description
When trying to upgrade a flink cluster from 1.18 to 1.19, with a 1.18 job in zookeeper HA state, I have a jobmanager crash with a ClassCastException, see log below
2024-06-18 16:58:14,401 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Fatal error occurred in the cluster entrypoint. org.apache.flink.util.FlinkException: JobMaster for job 5f0898c964a93a47aa480427f3e2c6c0 failed. at org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:1484) ~[flink-dist-1.19.0.jar:1.19.0] at org.apache.flink.runtime.dispatcher.Dispatcher.jobManagerRunnerFailed(Dispatcher.java:775) ~[flink-dist-1.19.0.jar:1.19.0] at org.apache.flink.runtime.dispatcher.Dispatcher.handleJobManagerRunnerResult(Dispatcher.java:738) ~[flink-dist-1.19.0.jar:1.19.0] at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$7(Dispatcher.java:693) ~[flink-dist-1.19.0.jar:1.19.0] at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:934) ~[?:?] at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:911) ~[?:?] at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482) ~[?:?] at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRunAsync$4(PekkoRpcActor.java:451) ~[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-dist-1.19.0.jar:1.19.0] at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRunAsync(PekkoRpcActor.java:451) ~[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:218) ~[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85) ~[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168) ~[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547) [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545) [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229) [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590) [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557) [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280) [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241) [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253) [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373) [?:?] at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182) [?:?] at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655) [?:?] at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622) [?:?] at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165) [?:?] Caused by: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster. at org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97) ~[flink-dist-1.19.0.jar:1.19.0] at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[?:?] at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841) ~[?:?] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?] at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1773) ~[?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?] at java.lang.Thread.run(Thread.java:840) ~[?:?] Caused by: java.util.concurrent.CompletionException: java.lang.ClassCastException: cannot assign instance of org.apache.flink.api.common.time.Time to field org.apache.flink.api.common.restartstrategy.RestartStrategies$FixedDelayRestartStrategyConfiguration.delayBetweenAttemptsInterval of type java.time.Duration in instance of org.apache.flink.api.common.restartstrategy.RestartStrategies$FixedDelayRestartStrategyConfiguration at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315) ~[?:?] at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320) ~[?:?] at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1770) ~[?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?] at java.lang.Thread.run(Thread.java:840) ~[?:?] Caused by: java.lang.ClassCastException: cannot assign instance of org.apache.flink.api.common.time.Time to field org.apache.flink.api.common.restartstrategy.RestartStrategies$FixedDelayRestartStrategyConfiguration.delayBetweenAttemptsInterval of type java.time.Duration in instance of org.apache.flink.api.common.restartstrategy.RestartStrategies$FixedDelayRestartStrategyConfiguration at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2096) ~[?:?] at java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(ObjectStreamClass.java:2060) ~[?:?] at java.io.ObjectStreamClass.checkObjFieldValueTypes(ObjectStreamClass.java:1347) ~[?:?] at java.io.ObjectInputStream$FieldValues.defaultCheckFieldValues(ObjectInputStream.java:2679) ~[?:?] at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2486) ~[?:?] at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257) ~[?:?] at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733) ~[?:?] at java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2606) ~[?:?] at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2457) ~[?:?] at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257) ~[?:?] at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733) ~[?:?] at java.io.ObjectInputStream.readObject(ObjectInputStream.java:509) ~[?:?] at java.io.ObjectInputStream.readObject(ObjectInputStream.java:467) ~[?:?] at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:539) ~[flink-dist-1.19.0.jar:1.19.0] at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:527) ~[flink-dist-1.19.0.jar:1.19.0] at org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:67) ~[flink-dist-1.19.0.jar:1.19.0] at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:102) ~[flink-dist-1.19.0.jar:1.19.0] at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:121) ~[flink-dist-1.19.0.jar:1.19.0] at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:379) ~[flink-dist-1.19.0.jar:1.19.0] at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:356) ~[flink-dist-1.19.0.jar:1.19.0] at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:128) ~[flink-dist-1.19.0.jar:1.19.0] at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:100) ~[flink-dist-1.19.0.jar:1.19.0] at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112) ~[flink-dist-1.19.0.jar:1.19.0] at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768) ~[?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?] at java.lang.Thread.run(Thread.java:840) ~[?:?] 2024-06-18 16:58:14,403 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Shutting StandaloneSessionClusterEntrypoint down with application status UNKNOWN. Diagnostics Cluster entrypoint has been closed externally.. 2024-06-18 16:58:14,404 INFO org.apache.flink.runtime.blob.BlobServer [] - Stopped BLOB server at 127.0.0.1:40067 2024-06-18 16:58:14,431 INFO org.apache.flink.shaded.curator5.org.apache.curator.utils.Compatibility [] - Using org.apache.zookeeper.server.quorum.MultipleAddresses
Reproducing
Download 1.18 and 1.19 binary releases.
Add the following to flink-1.19.0/conf/config.yaml and flink-1.18.1/conf/flink-conf.yaml
high-availability: zookeeper
high-availability.zookeeper.quorum: localhost
high-availability.storageDir: file:///tmp/flink/recovery
Launch zookeeper:
docker run --network host zookeeper:latest
launch 1.18 job manager:
./flink-1.18.1/bin/jobmanager.sh start-foreground
launch 1.18 task manager:
./flink-1.18.1/bin/taskmanager.sh start-foreground
create the following job:
import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.util.Collector; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.time.Time; import java.util.concurrent.TimeUnit;public class FlinkJob { public static void main(String[] args) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.setRestartStrategy( RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, Time.of(20, TimeUnit.SECONDS)) ); env.fromElements("Hello World", "Hello Flink") .flatMap(new LineSplitter()) .groupBy(0) .sum(1) .print(); } public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { for (String word : value.split(" ")) { try { Thread.sleep(120000); } catch (InterruptedException e) { e.printStackTrace(); } out.collect(new Tuple2<>(word, 1)); } } }}
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.apache.flink</groupId> <artifactId>myflinkjob</artifactId> <version>1.0-SNAPSHOT</version> <properties> <flink.version>1.18.1</flink.version> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.1</version> <configuration> <source>${java.version}</source> <target>${java.version}</target> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <version>3.1.0</version> <configuration> <archive> <manifest> <addClasspath>true</addClasspath> <classpathPrefix>lib/</classpathPrefix> <mainClass>FlinkJob</mainClass> </manifest> </archive> </configuration> </plugin> </plugins> </build> </project>
Launch job:
./flink-1.18.1/bin/flink run ../flink-job/target/myflinkjob-1.0-SNAPSHOT.jar Job has been submitted with JobID 5f0898c964a93a47aa480427f3e2c6c0
Kill job manager and task manager.
Then launch job manager 1.19.0
./flink-1.19.0/bin/jobmanager.sh start-foreground
job manager will crash with stack trace above.
Root cause
It looks like the type of delayBetweenAttemptsInterval was changed in 1.19 https://github.com/apache/flink/pull/22984/files#diff-d174f32ffdea69de610c4f37c545bd22a253b9846434f83397f1bbc2aaa399faR239 , introducing an incompatibility which is not handled by flink 1.19.
In my opinion, job-maanger should not crash when starting in that case.
Attachments
Attachments
Issue Links
- is caused by
-
FLINK-32570 Deprecate API that uses Flink's Time implementation (related to FLINK-14638)
- Closed
- links to