Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Duplicate
-
1.8.0
-
None
-
GCS +
<flink.version>1.8.0</flink.version> <java.version>1.8</java.version> <scala.binary.version>2.11</scala.binary.version>
<!-- Cloud Storage: --> <!-- https://search.maven.org/search?q=g:com.google.cloud.bigdataoss --> <!-- https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/pubsub/README.md --> <dependency> <groupId>com.google.cloud.bigdataoss</groupId> <artifactId>gcs-connector</artifactId> <version>hadoop2-1.9.16</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-filesystem_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-hadoop-fs</artifactId> <version>${flink.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop2 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-shaded-hadoop2</artifactId> <version>${hadoop.version}-${flink.version}</version> </dependency>
GCS + <flink.version>1.8.0</flink.version> <java.version>1.8</java.version> <scala.binary.version>2.11</scala.binary.version> <!-- Cloud Storage: --> <!-- https: //search.maven.org/search?q=g:com.google.cloud.bigdataoss --> <!-- https: //github.com/GoogleCloudPlatform/bigdata-interop/blob/master/pubsub/README.md --> <dependency> <groupId>com.google.cloud.bigdataoss</groupId> <artifactId>gcs-connector</artifactId> <version>hadoop2-1.9.16</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-filesystem_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-hadoop-fs</artifactId> <version>${flink.version}</version> </dependency> <!-- https: //mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop2 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-shaded-hadoop2</artifactId> <version>${hadoop.version}-${flink.version}</version> </dependency>
Description
When running one standalone-job w/ parallelism=1 + one taskmanager, you will shortly get this crash
2019-04-30 22:20:02,928 WARN org.apache.flink.runtime.jobmaster.JobMaster - Error while processing checkpoint acknowledgement message org.apache.flink.runtime.checkpoint.CheckpointException: Could not finalize the pending checkpoint 5. at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:837) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:756) at org.apache.flink.runtime.jobmaster.JobMaster.lambda$acknowledgeCheckpoint$9(JobMaster.java:676) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: 'gs://example_bucket/flink/checkpoints/00000000000000000000000000000000/chk-5/_metadata' already exists at com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.createChannel(GoogleHadoopOutputStream.java:85) at com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.<init>(GoogleHadoopOutputStream.java:74) at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.create(GoogleHadoopFileSystemBase.java:797) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:807) at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:141) at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:37) at org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.<init>(FsCheckpointMetadataOutputStream.java:65) at org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.createMetadataOutputStream(FsCheckpointStorageLocation.java:104) at org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:259) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:829) ... 8 more Caused by: java.nio.file.FileAlreadyExistsException: Object gs://example_bucket/flink/checkpoints/00000000000000000000000000000000/chk-5/_metadata already exists. at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.getWriteGeneration(GoogleCloudStorageImpl.java:1918) at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.create(GoogleCloudStorageImpl.java:410) at com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.createInternal(GoogleCloudStorageFileSystem.java:286) at com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.create(GoogleCloudStorageFileSystem.java:264) at com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.createChannel(GoogleHadoopOutputStream.java:82) ... 19 more 2019-04-30 22:20:03,114 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 6 @ 1556662802928 for job 00000000000000000000000000000000.
My guess at why; concurrent checkpoint writers are updating the _metadata resource concurrently. They should be using optimistic concurrency control with ETag on GCS, and then retry until successful.
When running with parallelism=4, you always seem to get this, even after deleting all checkpoints:
[analytics-job-cluster-668886c96b-mhqmc job] 2019-04-30 22:50:35,175 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint triggering task Source: Custom Source -> Process -> Timestamps/Watermarks -> our_events (1/4) of job 00000000000000000000000000000000 is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.
Or in short: with parallelism > 1, your Flink job never makes progress.
Attachments
Issue Links
- duplicates
-
FLINK-12381 W/o HA, upon a full restart, checkpointing crashes
- In Progress
- links to