Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-7268

Zookeeper Checkpoint Store interacting with Incremental State Handles can lead to loss of handles

    Details

      Description

      Release testing for Flink 1.3.2 has shown that this combination of features leads to this errors when using a very low restart delay:

      java.lang.IllegalStateException: Could not initialize keyed state backend.
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321)
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
      	at java.lang.Thread.run(Thread.java:748)
      Caused by: java.io.FileNotFoundException: Item not found: aljoscha/state-machine-checkpoints-2/f26e2b4c6891f2a9e0c5e4ba014733c3/chk-3/b246db8c-4f25-483a-b1fc-234f4319004d
      	at com.google.cloud.hadoop.gcsio.GoogleCloudStorageExceptions.getFileNotFoundException(GoogleCloudStorageExceptions.java:42)
      	at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.open(GoogleCloudStorageImpl.java:551)
      	at com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.open(GoogleCloudStorageFileSystem.java:322)
      	at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.<init>(GoogleHadoopFSInputStream.java:121)
      	at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.open(GoogleHadoopFileSystemBase.java:1076)
      	at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:767)
      	at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:404)
      	at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:48)
      	at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85)
      	at org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:69)
      	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.readStateData(RocksDBKeyedStateBackend.java:1281)
      	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.readAllStateData(RocksDBKeyedStateBackend.java:1468)
      	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restoreInstance(RocksDBKeyedStateBackend.java:1324)
      	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restore(RocksDBKeyedStateBackend.java:1503)
      	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:970)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:772)
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311)
      	... 6 more
      

      When this occurs the job is stuck in a restart loop. The problem (according to Stefan Richter) seems to be that removal of pending checkpoints from Zookeeper happens asynchronously and those request can go though when the Job has already restarted.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                srichter Stefan Richter
                Reporter:
                aljoscha Aljoscha Krettek
              • Votes:
                0 Vote for this issue
                Watchers:
                3 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: