Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-7830 Problematic interaction of CEP and asynchronous snapshots
  3. FLINK-7756

RocksDB state backend Checkpointing (Async and Incremental) is not working with CEP.

    Details

      Description

      When i try to use RocksDBStateBackend on my staging cluster (which is using HDFS as file system) it crashes. But When i use FsStateBackend on staging (which is using HDFS as file system) it is working fine.

      On local with local file system it's working fine in both cases.

      Please check attached logs. I have around 20-25 tasks in my app.

      2017-09-29 14:21:31,639 INFO  org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink  - No state to restore for the BucketingSink (taskIdx=0).
      2017-09-29 14:21:31,640 INFO  org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Initializing RocksDB keyed state backend from snapshot.
      2017-09-29 14:21:32,020 INFO  org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink  - No state to restore for the BucketingSink (taskIdx=1).
      2017-09-29 14:21:32,022 INFO  org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Initializing RocksDB keyed state backend from snapshot.
      2017-09-29 14:21:32,078 INFO  com.datastax.driver.core.NettyUtil                            - Found Netty's native epoll transport in the classpath, using it
      2017-09-29 14:21:34,177 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to fail task externally Co-Flat Map (1/2) (b879f192c4e8aae6671cdafb3a24c00a).
      2017-09-29 14:21:34,177 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to fail task externally Map (2/2) (1ea5aef6ccc7031edc6b37da2912d90b).
      2017-09-29 14:21:34,178 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to fail task externally Co-Flat Map (2/2) (4bac8e764c67520d418a4c755be23d4d).
      2017-09-29 14:21:34,178 INFO  org.apache.flink.runtime.taskmanager.Task                     - Co-Flat Map (1/2) (b879f192c4e8aae6671cdafb3a24c00a) switched from RUNNING to FAILED.
      AsynchronousException{java.lang.Exception: Could not materialize checkpoint 2 for operator Co-Flat Map (1/2).}
      	at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)
      	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      	at java.lang.Thread.run(Thread.java:745)
      Caused by: java.lang.Exception: Could not materialize checkpoint 2 for operator Co-Flat Map (1/2).
      	... 6 more
      Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateException
      	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
      	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
      	at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
      	... 5 more
      	Suppressed: java.lang.Exception: Could not properly cancel managed keyed state future.
      		at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
      		at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
      		at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961)
      		... 5 more
      	Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateException
      		at java.util.concurrent.FutureTask.report(FutureTask.java:122)
      		at java.util.concurrent.FutureTask.get(FutureTask.java:192)
      		at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
      		at org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
      		at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88)
      		... 7 more
      	Caused by: java.lang.IllegalStateException
      		at org.apache.flink.util.Preconditions.checkState(Preconditions.java:179)
      		at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.materializeSnapshot(RocksDBKeyedStateBackend.java:878)
      		at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:353)
      		at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:350)
      		at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      		at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
      		at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
      		... 5 more
      	[CIRCULAR REFERENCE:java.lang.IllegalStateException]
      2017-09-29 14:21:34,177 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to fail task externally Map (1/2) (a06925261e74b4efdf50a30089e2b778).
      2017-09-29 14:21:34,177 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to fail task externally Map (1/2) (1747902c96e63fefd977ac4d4a01d2fa).
      2017-09-29 14:21:34,180 INFO  org.apache.flink.runtime.taskmanager.Task                     - Map (1/2) (a06925261e74b4efdf50a30089e2b778) switched from RUNNING to FAILED.
      AsynchronousException{java.lang.Exception: Could not materialize checkpoint 2 for operator Map (1/2).}
      	at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)
      	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      	at java.lang.Thread.run(Thread.java:745)
      Caused by: java.lang.Exception: Could not materialize checkpoint 2 for operator Map (1/2).
      	... 6 more
      Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateException
      	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
      	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
      	at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
      	... 5 more
      	Suppressed: java.lang.Exception: Could not properly cancel managed keyed state future.
      		at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
      		at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
      		at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961)
      		... 5 more
      	Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateException
      		at java.util.concurrent.FutureTask.report(FutureTask.java:122)
      		at java.util.concurrent.FutureTask.get(FutureTask.java:192)
      		at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
      		at org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
      		at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88)
      		... 7 more
      	Caused by: java.lang.IllegalStateException
      		at org.apache.flink.util.Preconditions.checkState(Preconditions.java:179)
      		at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.materializeSnapshot(RocksDBKeyedStateBackend.java:878)
      		at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:353)
      		at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:350)
      		at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      		at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
      		at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
      		... 5 more
      	[CIRCULAR REFERENCE:java.lang.IllegalStateException]
      

      That same printed for around 12-13 tasks. Than following logs printed :

      2017-09-29 14:21:35,039 INFO  org.apache.flink.runtime.taskmanager.Task                     - Ensuring all FileSystem streams are closed for task Source: Custom Source (2/2) (77c896e2a2063e98f399244cae21c260) [CANCELED]
      2017-09-29 14:21:35,041 WARN  org.apache.hadoop.ipc.Client                                  - interrupted waiting to send rpc request to server
      java.lang.InterruptedException
      	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
      	at java.util.concurrent.FutureTask.get(FutureTask.java:191)
      	at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:1059)
      	at org.apache.hadoop.ipc.Client.call(Client.java:1454)
      	at org.apache.hadoop.ipc.Client.call(Client.java:1412)
      	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
      	at com.sun.proxy.$Proxy12.delete(Unknown Source)
      	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.delete(ClientNamenodeProtocolTranslatorPB.java:540)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:497)
      	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
      	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
      	at com.sun.proxy.$Proxy13.delete(Unknown Source)
      	at org.apache.hadoop.hdfs.DFSClient.delete(DFSClient.java:2044)
      	at org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:707)
      	at org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:703)
      	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
      	at org.apache.hadoop.hdfs.DistributedFileSystem.delete(DistributedFileSystem.java:714)
      	at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.delete(HadoopFileSystem.java:435)
      	at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.delete(SafetyNetWrapperFileSystem.java:106)
      	at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:324)
      	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.materializeMetaData(RocksDBKeyedStateBackend.java:826)
      	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.materializeSnapshot(RocksDBKeyedStateBackend.java:875)
      	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:353)
      	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:350)
      	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      	at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
      	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      	at java.lang.Thread.run(Thread.java:745)
      2017-09-29 14:21:35,042 WARN  org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory  - Could not delete the checkpoint stream file hdfs://static.175.87.9.5.clients.your-server.de:8020/flink/flink-checkpoints/rocksDB/events/e10dbe09aa2ecccb22737ddce8b4dc9f/chk-2/a28796de-978a-4f1a-8ff5-5f5c654b0ffc.
      java.io.IOException: java.lang.InterruptedException
      	at org.apache.hadoop.ipc.Client.call(Client.java:1460)
      	at org.apache.hadoop.ipc.Client.call(Client.java:1412)
      	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
      	at com.sun.proxy.$Proxy12.delete(Unknown Source)
      	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.delete(ClientNamenodeProtocolTranslatorPB.java:540)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:497)
      	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
      	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
      	at com.sun.proxy.$Proxy13.delete(Unknown Source)
      	at org.apache.hadoop.hdfs.DFSClient.delete(DFSClient.java:2044)
      	at org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:707)
      	at org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:703)
      	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
      	at org.apache.hadoop.hdfs.DistributedFileSystem.delete(DistributedFileSystem.java:714)
      	at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.delete(HadoopFileSystem.java:435)
      	at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.delete(SafetyNetWrapperFileSystem.java:106)
      	at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:324)
      	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.materializeMetaData(RocksDBKeyedStateBackend.java:826)
      	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.materializeSnapshot(RocksDBKeyedStateBackend.java:875)
      	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:353)
      	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:350)
      	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      	at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
      	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      	at java.lang.Thread.run(Thread.java:745)
      Caused by: java.lang.InterruptedException
      	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
      	at java.util.concurrent.FutureTask.get(FutureTask.java:191)
      	at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:1059)
      	at org.apache.hadoop.ipc.Client.call(Client.java:1454)
      	... 31 more
      2017-09-29 14:21:35,054 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to cancel task KeyedCEPPatternOperator -> Flat Map -> (Flat Map, Flat Map) (1/2) (8c6eff62d47c4a624a7554065bac36ee).
      2017-09-29 14:21:35,055 INFO  org.apache.flink.runtime.taskmanager.Task                     - KeyedCEPPatternOperator -> Flat Map -> (Flat Map, Flat Map) (1/2) (8c6eff62d47c4a624a7554065bac36ee) switched from RUNNING to CANCELING.
      
      

      Than same printed for 12-13 tasks.

        Attachments

        1. jobmanager_without_cassandra.log
          2.79 MB
          tarun razdan
        2. jobmanager.log
          365 kB
          Shashank Agarwal
        3. taskmanager_without_cassandra.log
          2.07 MB
          tarun razdan
        4. taskmanager.log
          4.03 MB
          Shashank Agarwal

          Activity

            People

            • Assignee:
              aljoscha Aljoscha Krettek
              Reporter:
              shashank734 Shashank Agarwal
            • Votes:
              0 Vote for this issue
              Watchers:
              11 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: