Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6537 Umbrella issue for fixes to incremental snapshots
  3. FLINK-6633

Register with shared state registry before adding to CompletedCheckpointStore

    Details

      Description

      Introducing placeholders for previously existing shared state requires a change that shared state is first registering with SharedStateregistry (thereby being consolidated) and only after that added to a CompletedCheckpointStore, so that the consolidated checkpoint is written to stable storage.

        Activity

        Hide
        srichter Stefan Richter added a comment -

        fixed in 0162543ac1

        Show
        srichter Stefan Richter added a comment - fixed in 0162543ac1
        Hide
        gyfora Gyula Fora added a comment -

        I have tried you fix, restore now works for incremental checkpoints but fails on the first checkpoint afterwards:

        org.apache.flink.runtime.checkpoint.CheckpointException: Could not finalize the pending checkpoint 4.
        at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:853)
        at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:772)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$handleCheckpointMessage$1.apply$mcV$sp(JobManager.scala:1462)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$handleCheckpointMessage$1.apply(JobManager.scala:1461)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$handleCheckpointMessage$1.apply(JobManager.scala:1461)
        at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
        at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
        Caused by: java.io.IOException: Unknown implementation of StreamStateHandle: class org.apache.flink.runtime.state.PlaceholderStreamStateHandle
        at org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.serializeStreamStateHandle(SavepointV2Serializer.java:484)
        at org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.serializeStreamStateHandleMap(SavepointV2Serializer.java:342)
        at org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.serializeKeyedStateHandle(SavepointV2Serializer.java:329)
        at org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.serializeSubtaskState(SavepointV2Serializer.java:270)
        at org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.serialize(SavepointV2Serializer.java:122)
        at org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.serialize(SavepointV2Serializer.java:66)
        at org.apache.flink.runtime.checkpoint.savepoint.SavepointStore.storeSavepointToHandle(SavepointStore.java:199)
        at org.apache.flink.runtime.checkpoint.savepoint.SavepointStore.storeExternalizedCheckpointToHandle(SavepointStore.java:164)
        at org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpointExternalized(PendingCheckpoint.java:287)
        at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:843)

        full log:
        https://gist.github.com/gyfora/cf1894158ddbc5bbba2c0cc70d69b505

        Show
        gyfora Gyula Fora added a comment - I have tried you fix, restore now works for incremental checkpoints but fails on the first checkpoint afterwards: org.apache.flink.runtime.checkpoint.CheckpointException: Could not finalize the pending checkpoint 4. at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:853) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:772) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$handleCheckpointMessage$1.apply$mcV$sp(JobManager.scala:1462) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$handleCheckpointMessage$1.apply(JobManager.scala:1461) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$handleCheckpointMessage$1.apply(JobManager.scala:1461) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.io.IOException: Unknown implementation of StreamStateHandle: class org.apache.flink.runtime.state.PlaceholderStreamStateHandle at org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.serializeStreamStateHandle(SavepointV2Serializer.java:484) at org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.serializeStreamStateHandleMap(SavepointV2Serializer.java:342) at org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.serializeKeyedStateHandle(SavepointV2Serializer.java:329) at org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.serializeSubtaskState(SavepointV2Serializer.java:270) at org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.serialize(SavepointV2Serializer.java:122) at org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.serialize(SavepointV2Serializer.java:66) at org.apache.flink.runtime.checkpoint.savepoint.SavepointStore.storeSavepointToHandle(SavepointStore.java:199) at org.apache.flink.runtime.checkpoint.savepoint.SavepointStore.storeExternalizedCheckpointToHandle(SavepointStore.java:164) at org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpointExternalized(PendingCheckpoint.java:287) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:843) full log: https://gist.github.com/gyfora/cf1894158ddbc5bbba2c0cc70d69b505
        Hide
        aljoscha Aljoscha Krettek added a comment -

        Is this fixed by FLINK-6640 or a new/different issue?

        Show
        aljoscha Aljoscha Krettek added a comment - Is this fixed by FLINK-6640 or a new/different issue?
        Hide
        srichter Stefan Richter added a comment -

        This is fixed through the issue you mentioned.

        Show
        srichter Stefan Richter added a comment - This is fixed through the issue you mentioned.
        Hide
        cresny@gmail.com Cliff Resnick added a comment -

        The issue that Gyula Fora mentioned still exists in current 1.4-SNAPSHOT, at least when using externalized checkpoints. It does not necessarily happen on first checkpoint after restore, but it does seem to stem from a job restart from externalized checkpoint. To help identify the cause I added a bit of logging to both RocksDBKeyedStateBackend and SavepointV2Serializer, the results of which I'm attaching to the issue. The log spans several checkpoints. You can see where sst files are mapped, then serialized. The last checkpoint (7) fails when it seems to try to serialize a Placeholder instead of 000027.sst.

        I hope this helps. If I can add logging to capture more relevant state please let me know (the test is reproducible).

        By the way, I also noticed that some sst files are re-serialized in subsequent checkpoints though their byte size does not change. Is that because they are still "hot" in RocksDB? I'm a bit sketchy on the concept so please forgive me!

        Show
        cresny@gmail.com Cliff Resnick added a comment - The issue that Gyula Fora mentioned still exists in current 1.4-SNAPSHOT, at least when using externalized checkpoints. It does not necessarily happen on first checkpoint after restore, but it does seem to stem from a job restart from externalized checkpoint. To help identify the cause I added a bit of logging to both RocksDBKeyedStateBackend and SavepointV2Serializer, the results of which I'm attaching to the issue. The log spans several checkpoints. You can see where sst files are mapped, then serialized. The last checkpoint (7) fails when it seems to try to serialize a Placeholder instead of 000027.sst. I hope this helps. If I can add logging to capture more relevant state please let me know (the test is reproducible). By the way, I also noticed that some sst files are re-serialized in subsequent checkpoints though their byte size does not change. Is that because they are still "hot" in RocksDB? I'm a bit sketchy on the concept so please forgive me!
        Hide
        cresny@gmail.com Cliff Resnick added a comment -
        2017-06-20 18:44:39.376 [ip-10-150-96-228] INFO  org.apache.flink.contrib.streaming.state.RocksDBStateBackend  - Attempting to load RocksDB native library and store it under '/media/flink/tmp0'
        2017-06-20 18:44:39.378 [ip-10-150-96-228] DEBUG org.apache.flink.contrib.streaming.state.RocksDBStateBackend  - Attempting to create RocksDB native library folder /media/flink/tmp0/rocksdb-lib-18151a61e3774f0bcd2b1adeed79010e
        2017-06-20 18:44:39.476 [ip-10-150-96-228] INFO  org.apache.flink.contrib.streaming.state.RocksDBStateBackend  - Successfully loaded RocksDB native library
        2017-06-20 18:44:39.521 [ip-10-150-96-228] INFO  o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Initializing RocksDB keyed state backend from snapshot.
        2017-06-20 18:44:39.522 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Restoring snapshot from state handles: null.
        2017-06-20 18:45:45.067 [ip-10-150-96-228] INFO  com.mediamath.reporting.combiner.Combiner  - start combiner snapshot for id 0
        2017-06-20 18:45:47.241 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Got StreamStateHandle from stream.closeAndGetHandle File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-1/00cae703-95e3-44a9-8914-5bc4f3eb7814 [5788104 bytes]
        2017-06-20 18:45:47.241 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - mapping 000011.sst ->  File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-1/00cae703-95e3-44a9-8914-5bc4f3eb7814 [5788104 bytes]
        2017-06-20 18:45:47.243 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Got StreamStateHandle from stream.closeAndGetHandle ByteStreamStateHandle{handleName='s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-1/050ebbaf-d0e6-4a54-90f0-aee0e8eaca21'}
        2017-06-20 18:45:47.451 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Got StreamStateHandle from stream.closeAndGetHandle File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-1/09c9f86b-8742-4871-8827-8f65f1484a8e [8289 bytes]
        2017-06-20 18:45:47.451 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Got StreamStateHandle from stream.closeAndGetHandle ByteStreamStateHandle{handleName='s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-1/3db783b5-bec0-4f2a-8f9f-43b80da7ab41'}
        2017-06-20 18:45:51.609 [ip-10-150-96-53] DEBUG o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serialize 1
        2017-06-20 18:45:51.620 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-1/a5431b25-aff1-4a23-ae29-a748deba6dea [30679044 bytes]
        2017-06-20 18:45:51.620 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-1/d00732ce-7226-4247-bdac-7f03d259e575 [23438 bytes]
        2017-06-20 18:45:51.620 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-1/00cae703-95e3-44a9-8914-5bc4f3eb7814 [5788104 bytes]
        2017-06-20 18:45:51.621 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing ByteStreamStateHandle{handleName='s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-1/050ebbaf-d0e6-4a54-90f0-aee0e8eaca21'}
        2017-06-20 18:45:51.621 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-1/09c9f86b-8742-4871-8827-8f65f1484a8e [8289 bytes]
        2017-06-20 18:45:51.621 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing ByteStreamStateHandle{handleName='s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-1/3db783b5-bec0-4f2a-8f9f-43b80da7ab41'}
        2017-06-20 18:45:51.622 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-1/6f6bdb7c-9bd7-48d2-8705-0f15f34ac8f8 [7209046 bytes]
        2017-06-20 18:45:51.625 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-1/341a1a9a-5a0a-4215-bfce-e4f57b1fd4e6 [1993606 bytes]
        2017-06-20 18:45:51.626 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-1/15334b43-0f8a-4d71-a1f5-3e30b8be20ce [3862 bytes]
        2017-06-20 18:46:40.859 [ip-10-150-96-228] INFO  com.mediamath.reporting.combiner.Combiner  - start combiner snapshot for id 0
        2017-06-20 18:46:43.400 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Got StreamStateHandle from stream.closeAndGetHandle File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-2/f29a179a-0e28-43ad-974c-c6f1f9cd521a [6195978 bytes]
        2017-06-20 18:46:43.400 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - mapping 000012.sst ->  File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-2/f29a179a-0e28-43ad-974c-c6f1f9cd521a [6195978 bytes]
        2017-06-20 18:46:43.400 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - 000011.sst exists already, mapping it ->  PlaceholderStreamStateHandle
        2017-06-20 18:46:43.401 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Got StreamStateHandle from stream.closeAndGetHandle ByteStreamStateHandle{handleName='s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-2/3f834e66-a6d7-43f5-a698-5a15fe3c195c'}
        2017-06-20 18:46:43.566 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Got StreamStateHandle from stream.closeAndGetHandle File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-2/4e5100fe-3685-4adc-b50c-32e8d4b270cc [8289 bytes]
        2017-06-20 18:46:43.566 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Got StreamStateHandle from stream.closeAndGetHandle ByteStreamStateHandle{handleName='s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-2/f8f12f55-9c30-4d4c-8331-c3531ad06a46'}
        2017-06-20 18:46:46.655 [ip-10-150-96-53] DEBUG o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serialize 2
        2017-06-20 18:46:46.676 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-2/9a7440d7-2ef3-43f8-b2cc-5989496a4fd5 [59118493 bytes]
        2017-06-20 18:46:46.677 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-2/aa5f6363-f6f8-43ed-a089-0b8670f685de [23438 bytes]
        2017-06-20 18:46:46.677 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-1/00cae703-95e3-44a9-8914-5bc4f3eb7814 [5788104 bytes]
        2017-06-20 18:46:46.677 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-2/f29a179a-0e28-43ad-974c-c6f1f9cd521a [6195978 bytes]
        2017-06-20 18:46:46.677 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing ByteStreamStateHandle{handleName='s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-2/3f834e66-a6d7-43f5-a698-5a15fe3c195c'}
        2017-06-20 18:46:46.677 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-2/4e5100fe-3685-4adc-b50c-32e8d4b270cc [8289 bytes]
        2017-06-20 18:46:46.678 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing ByteStreamStateHandle{handleName='s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-2/f8f12f55-9c30-4d4c-8331-c3531ad06a46'}
        2017-06-20 18:46:46.678 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-2/b4a3de3b-fd12-4b92-8766-18d44935edb4 [10946667 bytes]
        2017-06-20 18:46:46.679 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-2/c8af105d-110b-4d43-b340-7c09bf56f167 [183402 bytes]
        2017-06-20 18:46:46.679 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-2/552231ff-0501-4c5a-a6d8-78f5d3a77671 [3862 bytes]
        2017-06-20 18:47:41.402 [ip-10-150-96-228] INFO  com.mediamath.reporting.combiner.Combiner  - start combiner snapshot for id 0
        2017-06-20 18:47:43.619 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Got StreamStateHandle from stream.closeAndGetHandle File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-3/7039262c-c115-4166-973f-201953e936d9 [6235839 bytes]
        2017-06-20 18:47:43.619 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - mapping 000013.sst ->  File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-3/7039262c-c115-4166-973f-201953e936d9 [6235839 bytes]
        2017-06-20 18:47:43.619 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - 000012.sst exists already, mapping it ->  PlaceholderStreamStateHandle
        2017-06-20 18:47:43.619 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - 000011.sst exists already, mapping it ->  PlaceholderStreamStateHandle
        2017-06-20 18:47:43.619 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Got StreamStateHandle from stream.closeAndGetHandle ByteStreamStateHandle{handleName='s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-3/b2ba7c4a-5406-44e3-96c5-f06f00af42f2'}
        2017-06-20 18:47:43.826 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Got StreamStateHandle from stream.closeAndGetHandle File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-3/eab32979-1b10-4b85-ad95-5eb29ead6c6f [8289 bytes]
        2017-06-20 18:47:43.827 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Got StreamStateHandle from stream.closeAndGetHandle ByteStreamStateHandle{handleName='s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-3/a04901fd-4db0-4b59-9857-f6f18ed9fc97'}
        2017-06-20 18:47:49.197 [ip-10-150-96-53] DEBUG o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serialize 3
        2017-06-20 18:47:49.211 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-3/ac5133c2-9614-45ab-9add-de3786037ca6 [91657876 bytes]
        2017-06-20 18:47:49.212 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-3/b8ebb0a4-7eb6-4ef7-a2ef-b5db02b729f3 [23438 bytes]
        2017-06-20 18:47:49.213 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-1/00cae703-95e3-44a9-8914-5bc4f3eb7814 [5788104 bytes]
        2017-06-20 18:47:49.213 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-2/f29a179a-0e28-43ad-974c-c6f1f9cd521a [6195978 bytes]
        2017-06-20 18:47:49.213 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-3/7039262c-c115-4166-973f-201953e936d9 [6235839 bytes]
        2017-06-20 18:47:49.213 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing ByteStreamStateHandle{handleName='s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-3/b2ba7c4a-5406-44e3-96c5-f06f00af42f2'}
        2017-06-20 18:47:49.213 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-3/eab32979-1b10-4b85-ad95-5eb29ead6c6f [8289 bytes]
        2017-06-20 18:47:49.213 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing ByteStreamStateHandle{handleName='s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-3/a04901fd-4db0-4b59-9857-f6f18ed9fc97'}
        2017-06-20 18:47:49.213 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-3/c5699456-c7c3-4d3c-907d-ea18931ecf8c [13673706 bytes]
        2017-06-20 18:47:49.214 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-3/122cef88-4f34-44d1-8b19-fd8a0ac4248a [575194 bytes]
        2017-06-20 18:47:49.214 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-3/6acec33e-e64f-4f70-8bb5-565612cfcb6a [3862 bytes]
        2017-06-20 18:48:42.053 [ip-10-150-96-228] INFO  com.mediamath.reporting.combiner.Combiner  - start combiner snapshot for id 0
        2017-06-20 18:48:45.826 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Got StreamStateHandle from stream.closeAndGetHandle File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-4/4b14dd33-b0d3-407e-98d0-68909f074738 [5897561 bytes]
        2017-06-20 18:48:45.826 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - mapping 000014.sst ->  File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-4/4b14dd33-b0d3-407e-98d0-68909f074738 [5897561 bytes]
        2017-06-20 18:48:45.826 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - 000013.sst exists already, mapping it ->  PlaceholderStreamStateHandle
        2017-06-20 18:48:45.826 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - 000012.sst exists already, mapping it ->  PlaceholderStreamStateHandle
        2017-06-20 18:48:45.826 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - 000011.sst exists already, mapping it ->  PlaceholderStreamStateHandle
        2017-06-20 18:48:45.827 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Got StreamStateHandle from stream.closeAndGetHandle ByteStreamStateHandle{handleName='s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-4/b0cb6026-ced2-4295-bc6f-cc0ae4db68b4'}
        2017-06-20 18:48:46.375 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Got StreamStateHandle from stream.closeAndGetHandle File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-4/ae7a1034-641d-4adc-8dc3-7a8937ac7715 [8289 bytes]
        2017-06-20 18:48:46.381 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Got StreamStateHandle from stream.closeAndGetHandle ByteStreamStateHandle{handleName='s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-4/bfcdc87a-5210-4bf2-abdc-471af0b345c7'}
        2017-06-20 18:48:52.977 [ip-10-150-96-53] DEBUG o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serialize 4
        2017-06-20 18:48:52.995 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-4/a8f275b8-137e-4a32-8485-fcf6f4b04f4e [122278166 bytes]
        2017-06-20 18:48:52.996 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-4/dc837ed4-a3a2-444c-afb5-6b498855f3fb [23438 bytes]
        2017-06-20 18:48:52.996 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-1/00cae703-95e3-44a9-8914-5bc4f3eb7814 [5788104 bytes]
        2017-06-20 18:48:52.996 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-2/f29a179a-0e28-43ad-974c-c6f1f9cd521a [6195978 bytes]
        2017-06-20 18:48:52.996 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-4/4b14dd33-b0d3-407e-98d0-68909f074738 [5897561 bytes]
        2017-06-20 18:48:52.996 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-3/7039262c-c115-4166-973f-201953e936d9 [6235839 bytes]
        2017-06-20 18:48:52.997 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing ByteStreamStateHandle{handleName='s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-4/b0cb6026-ced2-4295-bc6f-cc0ae4db68b4'}
        2017-06-20 18:48:52.997 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-4/ae7a1034-641d-4adc-8dc3-7a8937ac7715 [8289 bytes]
        2017-06-20 18:48:52.997 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing ByteStreamStateHandle{handleName='s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-4/bfcdc87a-5210-4bf2-abdc-471af0b345c7'}
        2017-06-20 18:48:52.997 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-4/837cc4cb-79e0-4331-8bf7-375da34b1e19 [15582381 bytes]
        2017-06-20 18:48:52.998 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-4/4fda3595-c918-4585-8f0f-f374fe767aae [884927 bytes]
        2017-06-20 18:48:52.998 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-4/138727b1-5698-481d-a7d9-bf33509715d2 [3862 bytes]
        2017-06-20 18:49:09.900 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - deleting instance base path /media/flink/tmp1/flink-io-ca33df63-1d07-42f1-b354-1d15ea7a8462/job-4717381ff29adb6cf31273d383179850_op-WindowOperator_8_0_uuid-1c58b736-0bdd-4bc5-959a-19265d669828
        2017-06-20 18:49:56.401 [ip-10-150-96-53] INFO  org.apache.flink.runtime.checkpoint.savepoint.SavepointStore  - Loading savepoint from s3://mm-flink-checkpoints/cliff/checkpoint_metadata-c20dc62d0085
        2017-06-20 18:49:56.584 [ip-10-150-96-53] DEBUG o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - deserialize 4:
         [OperatorState(operatorID: 3c249f46784e7918941b8254a455626e, parallelism: 1, maxParallelism: 128, sub task states: 1, total size (bytes): 0), OperatorState(operatorID: 2507a43fb0c9c91dd4f7e60bcfef8d4e, parallelism: 1, maxParallelism: 128, sub task states: 1, total size (bytes): 0), OperatorState(operatorID: bb0199e122f4970209706820e4def95b, parallelism: 1, maxParallelism: 128, sub task states: 1, total size (bytes): 122278166), OperatorState(operatorID: d1c48e48788f3367ffb96f7809bcfb9d, parallelism: 1, maxParallelism: 128, sub task states: 1, total size (bytes): 39732540), OperatorState(operatorID: 44aee4921a91b9a2911691cc10171a24, parallelism: 1, maxParallelism: 128, sub task states: 1, total size (bytes): 884927), OperatorState(operatorID: cc6f3a804a6c2964c83f3008635bbb25, parallelism: 1, maxParallelism: 128, sub task states: 1, total size (bytes): 3862), OperatorState(operatorID: 70d8f5f0f7af92550fed76e829eeea46, parallelism: 1, maxParallelism: 128, sub task states: 1, total size (bytes): 0)]
         []
        2017-06-20 18:49:57.076 [ip-10-150-96-228] INFO  o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Initializing RocksDB keyed state backend from snapshot.
        2017-06-20 18:49:57.077 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Restoring snapshot from state handles: [org.apache.flink.runtime.state.IncrementalKeyedStateHandle@1c5e4f7f].
        2017-06-20 18:49:57.079 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - restoring instance org.apache.flink.runtime.state.IncrementalKeyedStateHandle@1c5e4f7f, hasExtrayKeys: false
        2017-06-20 18:50:57.184 [ip-10-150-96-228] INFO  com.mediamath.reporting.combiner.Combiner  - start combiner snapshot for id 0
        2017-06-20 18:50:59.684 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Got StreamStateHandle from stream.closeAndGetHandle File State: s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-5/a0a10512-d3c1-4a58-9aa4-5be707b9b089 [5398330 bytes]
        2017-06-20 18:50:59.685 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - mapping 000021.sst ->  File State: s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-5/a0a10512-d3c1-4a58-9aa4-5be707b9b089 [5398330 bytes]
        2017-06-20 18:51:00.663 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Got StreamStateHandle from stream.closeAndGetHandle File State: s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-5/3d34f19b-c202-4178-8c52-a5e37c3d9901 [11357099 bytes]
        2017-06-20 18:51:00.663 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - mapping 000019.sst ->  File State: s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-5/3d34f19b-c202-4178-8c52-a5e37c3d9901 [11357099 bytes]
        2017-06-20 18:51:00.826 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Got StreamStateHandle from stream.closeAndGetHandle File State: s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-5/ecd262fe-3996-4a33-b62b-d839c77a64ee [1263 bytes]
        2017-06-20 18:51:01.393 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Got StreamStateHandle from stream.closeAndGetHandle File State: s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-5/bd6229de-fb7b-451d-bcf7-ec481f1bcde3 [8290 bytes]
        2017-06-20 18:51:01.393 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Got StreamStateHandle from stream.closeAndGetHandle ByteStreamStateHandle{handleName='s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-5/b8fcab59-a115-4f14-ab95-c1e175cf3e73'}
        2017-06-20 18:51:09.508 [ip-10-150-96-53] DEBUG o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serialize 5
        2017-06-20 18:51:09.519 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-5/6fb709dc-3b84-4941-848d-715225729b84 [146716528 bytes]
        2017-06-20 18:51:09.519 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-5/51715b7a-a737-451b-923a-b5df5324ee5b [23438 bytes]
        2017-06-20 18:51:09.519 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-5/3d34f19b-c202-4178-8c52-a5e37c3d9901 [11357099 bytes]
        2017-06-20 18:51:09.519 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-5/a0a10512-d3c1-4a58-9aa4-5be707b9b089 [5398330 bytes]
        2017-06-20 18:51:09.519 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-5/bd6229de-fb7b-451d-bcf7-ec481f1bcde3 [8290 bytes]
        2017-06-20 18:51:09.520 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-5/ecd262fe-3996-4a33-b62b-d839c77a64ee [1263 bytes]
        2017-06-20 18:51:09.520 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing ByteStreamStateHandle{handleName='s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-5/b8fcab59-a115-4f14-ab95-c1e175cf3e73'}
        2017-06-20 18:51:09.520 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-5/0019c0cb-19c0-48d8-82eb-2500c59ba7b9 [17007402 bytes]
        2017-06-20 18:51:09.520 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-5/b3eb5d05-55ac-452d-983a-8fe2502b595d [167751 bytes]
        2017-06-20 18:51:09.520 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-5/166bd57e-4cc3-45b9-bac8-db237fdbf5f4 [3862 bytes]
        2017-06-20 18:52:04.195 [ip-10-150-96-228] INFO  com.mediamath.reporting.combiner.Combiner  - start combiner snapshot for id 0
        2017-06-20 18:52:06.304 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Got StreamStateHandle from stream.closeAndGetHandle File State: s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-6/5202fb61-b86c-4561-ad55-6fd22b6d58b4 [5757950 bytes]
        2017-06-20 18:52:06.304 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - mapping 000022.sst ->  File State: s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-6/5202fb61-b86c-4561-ad55-6fd22b6d58b4 [5757950 bytes]
        2017-06-20 18:52:06.304 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - 000021.sst exists already, mapping it ->  PlaceholderStreamStateHandle
        2017-06-20 18:52:06.304 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - 000019.sst exists already, mapping it ->  PlaceholderStreamStateHandle
        2017-06-20 18:52:06.633 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Got StreamStateHandle from stream.closeAndGetHandle File State: s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-6/f6ee925f-8c98-48de-a8b9-c4ae74b9a321 [1459 bytes]
        2017-06-20 18:52:06.875 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Got StreamStateHandle from stream.closeAndGetHandle File State: s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-6/a2c86d4e-905f-4e14-afa7-01770d70da2f [8290 bytes]
        2017-06-20 18:52:06.875 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Got StreamStateHandle from stream.closeAndGetHandle ByteStreamStateHandle{handleName='s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-6/199226d9-3013-48c6-a2bf-69998a3cb69a'}
        2017-06-20 18:52:21.701 [ip-10-150-96-53] DEBUG o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serialize 6
        2017-06-20 18:52:21.714 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-6/d8b31f29-10f5-442e-b066-1c941c194bf9 [179631520 bytes]
        2017-06-20 18:52:21.714 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-6/a981d09b-f2e7-4c1c-b1e5-b6d8993a5723 [23438 bytes]
        2017-06-20 18:52:21.714 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-6/5202fb61-b86c-4561-ad55-6fd22b6d58b4 [5757950 bytes]
        2017-06-20 18:52:21.714 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-5/3d34f19b-c202-4178-8c52-a5e37c3d9901 [11357099 bytes]
        2017-06-20 18:52:21.714 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-5/a0a10512-d3c1-4a58-9aa4-5be707b9b089 [5398330 bytes]
        2017-06-20 18:52:21.715 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-6/a2c86d4e-905f-4e14-afa7-01770d70da2f [8290 bytes]
        2017-06-20 18:52:21.715 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-6/f6ee925f-8c98-48de-a8b9-c4ae74b9a321 [1459 bytes]
        2017-06-20 18:52:21.715 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing ByteStreamStateHandle{handleName='s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-6/199226d9-3013-48c6-a2bf-69998a3cb69a'}
        2017-06-20 18:52:21.715 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-6/25e15f27-0407-4352-9adb-a11aa02283e5 [18370953 bytes]
        2017-06-20 18:52:21.716 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-6/e84b1e6d-aa51-425c-83fe-fa4cd86baddf [3216752 bytes]
        2017-06-20 18:52:21.716 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-6/ab269cfb-d911-47b9-8e60-5e33d05e37c5 [3862 bytes]
        2017-06-20 18:52:34.079 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - deleting instance base path /media/flink/tmp1/flink-io-ca33df63-1d07-42f1-b354-1d15ea7a8462/job-cc5a467696318b3fc3abadb7094bdcab_op-WindowOperator_26_0_uuid-17e31040-4998-4152-b689-1cb151ab78d9
        2017-06-20 18:52:47.431 [ip-10-150-96-53] INFO  org.apache.flink.runtime.checkpoint.savepoint.SavepointStore  - Loading savepoint from s3://mm-flink-checkpoints/cliff/checkpoint_metadata-a3584d902d7e
        2017-06-20 18:52:47.881 [ip-10-150-96-53] DEBUG o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - deserialize 6:
         [OperatorState(operatorID: 3c249f46784e7918941b8254a455626e, parallelism: 1, maxParallelism: 128, sub task states: 1, total size (bytes): 0), OperatorState(operatorID: 2507a43fb0c9c91dd4f7e60bcfef8d4e, parallelism: 1, maxParallelism: 128, sub task states: 1, total size (bytes): 0), OperatorState(operatorID: bb0199e122f4970209706820e4def95b, parallelism: 1, maxParallelism: 128, sub task states: 1, total size (bytes): 179631520), OperatorState(operatorID: d1c48e48788f3367ffb96f7809bcfb9d, parallelism: 1, maxParallelism: 128, sub task states: 1, total size (bytes): 40917535), OperatorState(operatorID: 44aee4921a91b9a2911691cc10171a24, parallelism: 1, maxParallelism: 128, sub task states: 1, total size (bytes): 3216752), OperatorState(operatorID: cc6f3a804a6c2964c83f3008635bbb25, parallelism: 1, maxParallelism: 128, sub task states: 1, total size (bytes): 3862), OperatorState(operatorID: 70d8f5f0f7af92550fed76e829eeea46, parallelism: 1, maxParallelism: 128, sub task states: 1, total size (bytes): 0)]
         []
        2017-06-20 18:52:48.399 [ip-10-150-96-228] INFO  o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Initializing RocksDB keyed state backend from snapshot.
        2017-06-20 18:52:48.400 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Restoring snapshot from state handles: [org.apache.flink.runtime.state.IncrementalKeyedStateHandle@68123875].
        2017-06-20 18:52:48.400 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - restoring instance org.apache.flink.runtime.state.IncrementalKeyedStateHandle@68123875, hasExtrayKeys: false
        2017-06-20 18:53:58.658 [ip-10-150-96-228] INFO  com.mediamath.reporting.combiner.Combiner  - start combiner snapshot for id 0
        2017-06-20 18:54:01.517 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Got StreamStateHandle from stream.closeAndGetHandle File State: s3://mm-prod-stats-pulse-streaming/test/state/95a7913c5380d1eb1c71fff9ce607045/chk-7/6d3ea961-07bf-4425-b6da-faa78557afd6 [5932605 bytes]
        2017-06-20 18:54:01.517 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - mapping 000027.sst ->  File State: s3://mm-prod-stats-pulse-streaming/test/state/95a7913c5380d1eb1c71fff9ce607045/chk-7/6d3ea961-07bf-4425-b6da-faa78557afd6 [5932605 bytes]
        2017-06-20 18:54:01.518 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - 000022.sst exists already, mapping it ->  PlaceholderStreamStateHandle
        2017-06-20 18:54:01.518 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - 000021.sst exists already, mapping it ->  PlaceholderStreamStateHandle
        2017-06-20 18:54:01.518 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - 000019.sst exists already, mapping it ->  PlaceholderStreamStateHandle
        2017-06-20 18:54:01.518 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Got StreamStateHandle from stream.closeAndGetHandle ByteStreamStateHandle{handleName='s3://mm-prod-stats-pulse-streaming/test/state/95a7913c5380d1eb1c71fff9ce607045/chk-7/842f8194-49e4-4ca0-83d0-5adf436fdc07'}
        2017-06-20 18:54:01.779 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Got StreamStateHandle from stream.closeAndGetHandle File State: s3://mm-prod-stats-pulse-streaming/test/state/95a7913c5380d1eb1c71fff9ce607045/chk-7/ed2adfcc-43f7-49f1-a541-23216c59b5ea [8290 bytes]
        2017-06-20 18:54:01.779 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Got StreamStateHandle from stream.closeAndGetHandle ByteStreamStateHandle{handleName='s3://mm-prod-stats-pulse-streaming/test/state/95a7913c5380d1eb1c71fff9ce607045/chk-7/0315f986-b0b7-41f7-99f7-2d17a838129d'}
        2017-06-20 18:54:17.982 [ip-10-150-96-53] DEBUG o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serialize 7
        2017-06-20 18:54:17.998 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/95a7913c5380d1eb1c71fff9ce607045/chk-7/ed3a0477-503f-4a3f-9d60-6d7fbd2f3fb3 [208469835 bytes]
        2017-06-20 18:54:17.998 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/95a7913c5380d1eb1c71fff9ce607045/chk-7/d93b6051-7494-40d8-975f-18032232ee22 [23438 bytes]
        2017-06-20 18:54:17.998 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing org.apache.flink.runtime.state.PlaceholderStreamStateHandle@4c6d48f5
        2017-06-20 18:54:38.575 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - deleting instance base path /media/flink/tmp0/flink-io-2ba7a991-77cf-421e-bbc8-05dbaa8cc93f/job-95a7913c5380d1eb1c71fff9ce607045_op-WindowOperator_44_0_uuid-d0369bd3-d35a-4ade-85df-c0cfd1f0ac43
        
        Show
        cresny@gmail.com Cliff Resnick added a comment - 2017-06-20 18:44:39.376 [ip-10-150-96-228] INFO org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Attempting to load RocksDB native library and store it under '/media/flink/tmp0' 2017-06-20 18:44:39.378 [ip-10-150-96-228] DEBUG org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Attempting to create RocksDB native library folder /media/flink/tmp0/rocksdb-lib-18151a61e3774f0bcd2b1adeed79010e 2017-06-20 18:44:39.476 [ip-10-150-96-228] INFO org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Successfully loaded RocksDB native library 2017-06-20 18:44:39.521 [ip-10-150-96-228] INFO o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - Initializing RocksDB keyed state backend from snapshot. 2017-06-20 18:44:39.522 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - Restoring snapshot from state handles: null. 2017-06-20 18:45:45.067 [ip-10-150-96-228] INFO com.mediamath.reporting.combiner.Combiner - start combiner snapshot for id 0 2017-06-20 18:45:47.241 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - Got StreamStateHandle from stream.closeAndGetHandle File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-1/00cae703-95e3-44a9-8914-5bc4f3eb7814 [5788104 bytes] 2017-06-20 18:45:47.241 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - mapping 000011.sst -> File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-1/00cae703-95e3-44a9-8914-5bc4f3eb7814 [5788104 bytes] 2017-06-20 18:45:47.243 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - Got StreamStateHandle from stream.closeAndGetHandle ByteStreamStateHandle{handleName='s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-1/050ebbaf-d0e6-4a54-90f0-aee0e8eaca21'} 2017-06-20 18:45:47.451 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - Got StreamStateHandle from stream.closeAndGetHandle File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-1/09c9f86b-8742-4871-8827-8f65f1484a8e [8289 bytes] 2017-06-20 18:45:47.451 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - Got StreamStateHandle from stream.closeAndGetHandle ByteStreamStateHandle{handleName='s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-1/3db783b5-bec0-4f2a-8f9f-43b80da7ab41'} 2017-06-20 18:45:51.609 [ip-10-150-96-53] DEBUG o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serialize 1 2017-06-20 18:45:51.620 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-1/a5431b25-aff1-4a23-ae29-a748deba6dea [30679044 bytes] 2017-06-20 18:45:51.620 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-1/d00732ce-7226-4247-bdac-7f03d259e575 [23438 bytes] 2017-06-20 18:45:51.620 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-1/00cae703-95e3-44a9-8914-5bc4f3eb7814 [5788104 bytes] 2017-06-20 18:45:51.621 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing ByteStreamStateHandle{handleName='s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-1/050ebbaf-d0e6-4a54-90f0-aee0e8eaca21'} 2017-06-20 18:45:51.621 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-1/09c9f86b-8742-4871-8827-8f65f1484a8e [8289 bytes] 2017-06-20 18:45:51.621 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing ByteStreamStateHandle{handleName='s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-1/3db783b5-bec0-4f2a-8f9f-43b80da7ab41'} 2017-06-20 18:45:51.622 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-1/6f6bdb7c-9bd7-48d2-8705-0f15f34ac8f8 [7209046 bytes] 2017-06-20 18:45:51.625 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-1/341a1a9a-5a0a-4215-bfce-e4f57b1fd4e6 [1993606 bytes] 2017-06-20 18:45:51.626 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-1/15334b43-0f8a-4d71-a1f5-3e30b8be20ce [3862 bytes] 2017-06-20 18:46:40.859 [ip-10-150-96-228] INFO com.mediamath.reporting.combiner.Combiner - start combiner snapshot for id 0 2017-06-20 18:46:43.400 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - Got StreamStateHandle from stream.closeAndGetHandle File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-2/f29a179a-0e28-43ad-974c-c6f1f9cd521a [6195978 bytes] 2017-06-20 18:46:43.400 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - mapping 000012.sst -> File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-2/f29a179a-0e28-43ad-974c-c6f1f9cd521a [6195978 bytes] 2017-06-20 18:46:43.400 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - 000011.sst exists already, mapping it -> PlaceholderStreamStateHandle 2017-06-20 18:46:43.401 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - Got StreamStateHandle from stream.closeAndGetHandle ByteStreamStateHandle{handleName='s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-2/3f834e66-a6d7-43f5-a698-5a15fe3c195c'} 2017-06-20 18:46:43.566 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - Got StreamStateHandle from stream.closeAndGetHandle File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-2/4e5100fe-3685-4adc-b50c-32e8d4b270cc [8289 bytes] 2017-06-20 18:46:43.566 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - Got StreamStateHandle from stream.closeAndGetHandle ByteStreamStateHandle{handleName='s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-2/f8f12f55-9c30-4d4c-8331-c3531ad06a46'} 2017-06-20 18:46:46.655 [ip-10-150-96-53] DEBUG o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serialize 2 2017-06-20 18:46:46.676 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-2/9a7440d7-2ef3-43f8-b2cc-5989496a4fd5 [59118493 bytes] 2017-06-20 18:46:46.677 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-2/aa5f6363-f6f8-43ed-a089-0b8670f685de [23438 bytes] 2017-06-20 18:46:46.677 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-1/00cae703-95e3-44a9-8914-5bc4f3eb7814 [5788104 bytes] 2017-06-20 18:46:46.677 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-2/f29a179a-0e28-43ad-974c-c6f1f9cd521a [6195978 bytes] 2017-06-20 18:46:46.677 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing ByteStreamStateHandle{handleName='s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-2/3f834e66-a6d7-43f5-a698-5a15fe3c195c'} 2017-06-20 18:46:46.677 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-2/4e5100fe-3685-4adc-b50c-32e8d4b270cc [8289 bytes] 2017-06-20 18:46:46.678 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing ByteStreamStateHandle{handleName='s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-2/f8f12f55-9c30-4d4c-8331-c3531ad06a46'} 2017-06-20 18:46:46.678 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-2/b4a3de3b-fd12-4b92-8766-18d44935edb4 [10946667 bytes] 2017-06-20 18:46:46.679 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-2/c8af105d-110b-4d43-b340-7c09bf56f167 [183402 bytes] 2017-06-20 18:46:46.679 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-2/552231ff-0501-4c5a-a6d8-78f5d3a77671 [3862 bytes] 2017-06-20 18:47:41.402 [ip-10-150-96-228] INFO com.mediamath.reporting.combiner.Combiner - start combiner snapshot for id 0 2017-06-20 18:47:43.619 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - Got StreamStateHandle from stream.closeAndGetHandle File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-3/7039262c-c115-4166-973f-201953e936d9 [6235839 bytes] 2017-06-20 18:47:43.619 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - mapping 000013.sst -> File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-3/7039262c-c115-4166-973f-201953e936d9 [6235839 bytes] 2017-06-20 18:47:43.619 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - 000012.sst exists already, mapping it -> PlaceholderStreamStateHandle 2017-06-20 18:47:43.619 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - 000011.sst exists already, mapping it -> PlaceholderStreamStateHandle 2017-06-20 18:47:43.619 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - Got StreamStateHandle from stream.closeAndGetHandle ByteStreamStateHandle{handleName='s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-3/b2ba7c4a-5406-44e3-96c5-f06f00af42f2'} 2017-06-20 18:47:43.826 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - Got StreamStateHandle from stream.closeAndGetHandle File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-3/eab32979-1b10-4b85-ad95-5eb29ead6c6f [8289 bytes] 2017-06-20 18:47:43.827 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - Got StreamStateHandle from stream.closeAndGetHandle ByteStreamStateHandle{handleName='s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-3/a04901fd-4db0-4b59-9857-f6f18ed9fc97'} 2017-06-20 18:47:49.197 [ip-10-150-96-53] DEBUG o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serialize 3 2017-06-20 18:47:49.211 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-3/ac5133c2-9614-45ab-9add-de3786037ca6 [91657876 bytes] 2017-06-20 18:47:49.212 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-3/b8ebb0a4-7eb6-4ef7-a2ef-b5db02b729f3 [23438 bytes] 2017-06-20 18:47:49.213 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-1/00cae703-95e3-44a9-8914-5bc4f3eb7814 [5788104 bytes] 2017-06-20 18:47:49.213 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-2/f29a179a-0e28-43ad-974c-c6f1f9cd521a [6195978 bytes] 2017-06-20 18:47:49.213 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-3/7039262c-c115-4166-973f-201953e936d9 [6235839 bytes] 2017-06-20 18:47:49.213 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing ByteStreamStateHandle{handleName='s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-3/b2ba7c4a-5406-44e3-96c5-f06f00af42f2'} 2017-06-20 18:47:49.213 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-3/eab32979-1b10-4b85-ad95-5eb29ead6c6f [8289 bytes] 2017-06-20 18:47:49.213 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing ByteStreamStateHandle{handleName='s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-3/a04901fd-4db0-4b59-9857-f6f18ed9fc97'} 2017-06-20 18:47:49.213 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-3/c5699456-c7c3-4d3c-907d-ea18931ecf8c [13673706 bytes] 2017-06-20 18:47:49.214 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-3/122cef88-4f34-44d1-8b19-fd8a0ac4248a [575194 bytes] 2017-06-20 18:47:49.214 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-3/6acec33e-e64f-4f70-8bb5-565612cfcb6a [3862 bytes] 2017-06-20 18:48:42.053 [ip-10-150-96-228] INFO com.mediamath.reporting.combiner.Combiner - start combiner snapshot for id 0 2017-06-20 18:48:45.826 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - Got StreamStateHandle from stream.closeAndGetHandle File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-4/4b14dd33-b0d3-407e-98d0-68909f074738 [5897561 bytes] 2017-06-20 18:48:45.826 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - mapping 000014.sst -> File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-4/4b14dd33-b0d3-407e-98d0-68909f074738 [5897561 bytes] 2017-06-20 18:48:45.826 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - 000013.sst exists already, mapping it -> PlaceholderStreamStateHandle 2017-06-20 18:48:45.826 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - 000012.sst exists already, mapping it -> PlaceholderStreamStateHandle 2017-06-20 18:48:45.826 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - 000011.sst exists already, mapping it -> PlaceholderStreamStateHandle 2017-06-20 18:48:45.827 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - Got StreamStateHandle from stream.closeAndGetHandle ByteStreamStateHandle{handleName='s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-4/b0cb6026-ced2-4295-bc6f-cc0ae4db68b4'} 2017-06-20 18:48:46.375 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - Got StreamStateHandle from stream.closeAndGetHandle File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-4/ae7a1034-641d-4adc-8dc3-7a8937ac7715 [8289 bytes] 2017-06-20 18:48:46.381 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - Got StreamStateHandle from stream.closeAndGetHandle ByteStreamStateHandle{handleName='s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-4/bfcdc87a-5210-4bf2-abdc-471af0b345c7'} 2017-06-20 18:48:52.977 [ip-10-150-96-53] DEBUG o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serialize 4 2017-06-20 18:48:52.995 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-4/a8f275b8-137e-4a32-8485-fcf6f4b04f4e [122278166 bytes] 2017-06-20 18:48:52.996 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-4/dc837ed4-a3a2-444c-afb5-6b498855f3fb [23438 bytes] 2017-06-20 18:48:52.996 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-1/00cae703-95e3-44a9-8914-5bc4f3eb7814 [5788104 bytes] 2017-06-20 18:48:52.996 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-2/f29a179a-0e28-43ad-974c-c6f1f9cd521a [6195978 bytes] 2017-06-20 18:48:52.996 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-4/4b14dd33-b0d3-407e-98d0-68909f074738 [5897561 bytes] 2017-06-20 18:48:52.996 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-3/7039262c-c115-4166-973f-201953e936d9 [6235839 bytes] 2017-06-20 18:48:52.997 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing ByteStreamStateHandle{handleName='s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-4/b0cb6026-ced2-4295-bc6f-cc0ae4db68b4'} 2017-06-20 18:48:52.997 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-4/ae7a1034-641d-4adc-8dc3-7a8937ac7715 [8289 bytes] 2017-06-20 18:48:52.997 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing ByteStreamStateHandle{handleName='s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-4/bfcdc87a-5210-4bf2-abdc-471af0b345c7'} 2017-06-20 18:48:52.997 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-4/837cc4cb-79e0-4331-8bf7-375da34b1e19 [15582381 bytes] 2017-06-20 18:48:52.998 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-4/4fda3595-c918-4585-8f0f-f374fe767aae [884927 bytes] 2017-06-20 18:48:52.998 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-4/138727b1-5698-481d-a7d9-bf33509715d2 [3862 bytes] 2017-06-20 18:49:09.900 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - deleting instance base path /media/flink/tmp1/flink-io-ca33df63-1d07-42f1-b354-1d15ea7a8462/job-4717381ff29adb6cf31273d383179850_op-WindowOperator_8_0_uuid-1c58b736-0bdd-4bc5-959a-19265d669828 2017-06-20 18:49:56.401 [ip-10-150-96-53] INFO org.apache.flink.runtime.checkpoint.savepoint.SavepointStore - Loading savepoint from s3://mm-flink-checkpoints/cliff/checkpoint_metadata-c20dc62d0085 2017-06-20 18:49:56.584 [ip-10-150-96-53] DEBUG o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - deserialize 4: [OperatorState(operatorID: 3c249f46784e7918941b8254a455626e, parallelism: 1, maxParallelism: 128, sub task states: 1, total size (bytes): 0), OperatorState(operatorID: 2507a43fb0c9c91dd4f7e60bcfef8d4e, parallelism: 1, maxParallelism: 128, sub task states: 1, total size (bytes): 0), OperatorState(operatorID: bb0199e122f4970209706820e4def95b, parallelism: 1, maxParallelism: 128, sub task states: 1, total size (bytes): 122278166), OperatorState(operatorID: d1c48e48788f3367ffb96f7809bcfb9d, parallelism: 1, maxParallelism: 128, sub task states: 1, total size (bytes): 39732540), OperatorState(operatorID: 44aee4921a91b9a2911691cc10171a24, parallelism: 1, maxParallelism: 128, sub task states: 1, total size (bytes): 884927), OperatorState(operatorID: cc6f3a804a6c2964c83f3008635bbb25, parallelism: 1, maxParallelism: 128, sub task states: 1, total size (bytes): 3862), OperatorState(operatorID: 70d8f5f0f7af92550fed76e829eeea46, parallelism: 1, maxParallelism: 128, sub task states: 1, total size (bytes): 0)] [] 2017-06-20 18:49:57.076 [ip-10-150-96-228] INFO o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - Initializing RocksDB keyed state backend from snapshot. 2017-06-20 18:49:57.077 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - Restoring snapshot from state handles: [org.apache.flink.runtime.state.IncrementalKeyedStateHandle@1c5e4f7f]. 2017-06-20 18:49:57.079 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - restoring instance org.apache.flink.runtime.state.IncrementalKeyedStateHandle@1c5e4f7f, hasExtrayKeys: false 2017-06-20 18:50:57.184 [ip-10-150-96-228] INFO com.mediamath.reporting.combiner.Combiner - start combiner snapshot for id 0 2017-06-20 18:50:59.684 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - Got StreamStateHandle from stream.closeAndGetHandle File State: s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-5/a0a10512-d3c1-4a58-9aa4-5be707b9b089 [5398330 bytes] 2017-06-20 18:50:59.685 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - mapping 000021.sst -> File State: s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-5/a0a10512-d3c1-4a58-9aa4-5be707b9b089 [5398330 bytes] 2017-06-20 18:51:00.663 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - Got StreamStateHandle from stream.closeAndGetHandle File State: s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-5/3d34f19b-c202-4178-8c52-a5e37c3d9901 [11357099 bytes] 2017-06-20 18:51:00.663 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - mapping 000019.sst -> File State: s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-5/3d34f19b-c202-4178-8c52-a5e37c3d9901 [11357099 bytes] 2017-06-20 18:51:00.826 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - Got StreamStateHandle from stream.closeAndGetHandle File State: s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-5/ecd262fe-3996-4a33-b62b-d839c77a64ee [1263 bytes] 2017-06-20 18:51:01.393 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - Got StreamStateHandle from stream.closeAndGetHandle File State: s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-5/bd6229de-fb7b-451d-bcf7-ec481f1bcde3 [8290 bytes] 2017-06-20 18:51:01.393 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - Got StreamStateHandle from stream.closeAndGetHandle ByteStreamStateHandle{handleName='s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-5/b8fcab59-a115-4f14-ab95-c1e175cf3e73'} 2017-06-20 18:51:09.508 [ip-10-150-96-53] DEBUG o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serialize 5 2017-06-20 18:51:09.519 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-5/6fb709dc-3b84-4941-848d-715225729b84 [146716528 bytes] 2017-06-20 18:51:09.519 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-5/51715b7a-a737-451b-923a-b5df5324ee5b [23438 bytes] 2017-06-20 18:51:09.519 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-5/3d34f19b-c202-4178-8c52-a5e37c3d9901 [11357099 bytes] 2017-06-20 18:51:09.519 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-5/a0a10512-d3c1-4a58-9aa4-5be707b9b089 [5398330 bytes] 2017-06-20 18:51:09.519 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-5/bd6229de-fb7b-451d-bcf7-ec481f1bcde3 [8290 bytes] 2017-06-20 18:51:09.520 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-5/ecd262fe-3996-4a33-b62b-d839c77a64ee [1263 bytes] 2017-06-20 18:51:09.520 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing ByteStreamStateHandle{handleName='s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-5/b8fcab59-a115-4f14-ab95-c1e175cf3e73'} 2017-06-20 18:51:09.520 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-5/0019c0cb-19c0-48d8-82eb-2500c59ba7b9 [17007402 bytes] 2017-06-20 18:51:09.520 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-5/b3eb5d05-55ac-452d-983a-8fe2502b595d [167751 bytes] 2017-06-20 18:51:09.520 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-5/166bd57e-4cc3-45b9-bac8-db237fdbf5f4 [3862 bytes] 2017-06-20 18:52:04.195 [ip-10-150-96-228] INFO com.mediamath.reporting.combiner.Combiner - start combiner snapshot for id 0 2017-06-20 18:52:06.304 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - Got StreamStateHandle from stream.closeAndGetHandle File State: s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-6/5202fb61-b86c-4561-ad55-6fd22b6d58b4 [5757950 bytes] 2017-06-20 18:52:06.304 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - mapping 000022.sst -> File State: s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-6/5202fb61-b86c-4561-ad55-6fd22b6d58b4 [5757950 bytes] 2017-06-20 18:52:06.304 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - 000021.sst exists already, mapping it -> PlaceholderStreamStateHandle 2017-06-20 18:52:06.304 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - 000019.sst exists already, mapping it -> PlaceholderStreamStateHandle 2017-06-20 18:52:06.633 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - Got StreamStateHandle from stream.closeAndGetHandle File State: s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-6/f6ee925f-8c98-48de-a8b9-c4ae74b9a321 [1459 bytes] 2017-06-20 18:52:06.875 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - Got StreamStateHandle from stream.closeAndGetHandle File State: s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-6/a2c86d4e-905f-4e14-afa7-01770d70da2f [8290 bytes] 2017-06-20 18:52:06.875 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - Got StreamStateHandle from stream.closeAndGetHandle ByteStreamStateHandle{handleName='s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-6/199226d9-3013-48c6-a2bf-69998a3cb69a'} 2017-06-20 18:52:21.701 [ip-10-150-96-53] DEBUG o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serialize 6 2017-06-20 18:52:21.714 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-6/d8b31f29-10f5-442e-b066-1c941c194bf9 [179631520 bytes] 2017-06-20 18:52:21.714 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-6/a981d09b-f2e7-4c1c-b1e5-b6d8993a5723 [23438 bytes] 2017-06-20 18:52:21.714 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-6/5202fb61-b86c-4561-ad55-6fd22b6d58b4 [5757950 bytes] 2017-06-20 18:52:21.714 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-5/3d34f19b-c202-4178-8c52-a5e37c3d9901 [11357099 bytes] 2017-06-20 18:52:21.714 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-5/a0a10512-d3c1-4a58-9aa4-5be707b9b089 [5398330 bytes] 2017-06-20 18:52:21.715 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-6/a2c86d4e-905f-4e14-afa7-01770d70da2f [8290 bytes] 2017-06-20 18:52:21.715 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-6/f6ee925f-8c98-48de-a8b9-c4ae74b9a321 [1459 bytes] 2017-06-20 18:52:21.715 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing ByteStreamStateHandle{handleName='s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-6/199226d9-3013-48c6-a2bf-69998a3cb69a'} 2017-06-20 18:52:21.715 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-6/25e15f27-0407-4352-9adb-a11aa02283e5 [18370953 bytes] 2017-06-20 18:52:21.716 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-6/e84b1e6d-aa51-425c-83fe-fa4cd86baddf [3216752 bytes] 2017-06-20 18:52:21.716 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/cc5a467696318b3fc3abadb7094bdcab/chk-6/ab269cfb-d911-47b9-8e60-5e33d05e37c5 [3862 bytes] 2017-06-20 18:52:34.079 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - deleting instance base path /media/flink/tmp1/flink-io-ca33df63-1d07-42f1-b354-1d15ea7a8462/job-cc5a467696318b3fc3abadb7094bdcab_op-WindowOperator_26_0_uuid-17e31040-4998-4152-b689-1cb151ab78d9 2017-06-20 18:52:47.431 [ip-10-150-96-53] INFO org.apache.flink.runtime.checkpoint.savepoint.SavepointStore - Loading savepoint from s3://mm-flink-checkpoints/cliff/checkpoint_metadata-a3584d902d7e 2017-06-20 18:52:47.881 [ip-10-150-96-53] DEBUG o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - deserialize 6: [OperatorState(operatorID: 3c249f46784e7918941b8254a455626e, parallelism: 1, maxParallelism: 128, sub task states: 1, total size (bytes): 0), OperatorState(operatorID: 2507a43fb0c9c91dd4f7e60bcfef8d4e, parallelism: 1, maxParallelism: 128, sub task states: 1, total size (bytes): 0), OperatorState(operatorID: bb0199e122f4970209706820e4def95b, parallelism: 1, maxParallelism: 128, sub task states: 1, total size (bytes): 179631520), OperatorState(operatorID: d1c48e48788f3367ffb96f7809bcfb9d, parallelism: 1, maxParallelism: 128, sub task states: 1, total size (bytes): 40917535), OperatorState(operatorID: 44aee4921a91b9a2911691cc10171a24, parallelism: 1, maxParallelism: 128, sub task states: 1, total size (bytes): 3216752), OperatorState(operatorID: cc6f3a804a6c2964c83f3008635bbb25, parallelism: 1, maxParallelism: 128, sub task states: 1, total size (bytes): 3862), OperatorState(operatorID: 70d8f5f0f7af92550fed76e829eeea46, parallelism: 1, maxParallelism: 128, sub task states: 1, total size (bytes): 0)] [] 2017-06-20 18:52:48.399 [ip-10-150-96-228] INFO o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - Initializing RocksDB keyed state backend from snapshot. 2017-06-20 18:52:48.400 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - Restoring snapshot from state handles: [org.apache.flink.runtime.state.IncrementalKeyedStateHandle@68123875]. 2017-06-20 18:52:48.400 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - restoring instance org.apache.flink.runtime.state.IncrementalKeyedStateHandle@68123875, hasExtrayKeys: false 2017-06-20 18:53:58.658 [ip-10-150-96-228] INFO com.mediamath.reporting.combiner.Combiner - start combiner snapshot for id 0 2017-06-20 18:54:01.517 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - Got StreamStateHandle from stream.closeAndGetHandle File State: s3://mm-prod-stats-pulse-streaming/test/state/95a7913c5380d1eb1c71fff9ce607045/chk-7/6d3ea961-07bf-4425-b6da-faa78557afd6 [5932605 bytes] 2017-06-20 18:54:01.517 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - mapping 000027.sst -> File State: s3://mm-prod-stats-pulse-streaming/test/state/95a7913c5380d1eb1c71fff9ce607045/chk-7/6d3ea961-07bf-4425-b6da-faa78557afd6 [5932605 bytes] 2017-06-20 18:54:01.518 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - 000022.sst exists already, mapping it -> PlaceholderStreamStateHandle 2017-06-20 18:54:01.518 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - 000021.sst exists already, mapping it -> PlaceholderStreamStateHandle 2017-06-20 18:54:01.518 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - 000019.sst exists already, mapping it -> PlaceholderStreamStateHandle 2017-06-20 18:54:01.518 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - Got StreamStateHandle from stream.closeAndGetHandle ByteStreamStateHandle{handleName='s3://mm-prod-stats-pulse-streaming/test/state/95a7913c5380d1eb1c71fff9ce607045/chk-7/842f8194-49e4-4ca0-83d0-5adf436fdc07'} 2017-06-20 18:54:01.779 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - Got StreamStateHandle from stream.closeAndGetHandle File State: s3://mm-prod-stats-pulse-streaming/test/state/95a7913c5380d1eb1c71fff9ce607045/chk-7/ed2adfcc-43f7-49f1-a541-23216c59b5ea [8290 bytes] 2017-06-20 18:54:01.779 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - Got StreamStateHandle from stream.closeAndGetHandle ByteStreamStateHandle{handleName='s3://mm-prod-stats-pulse-streaming/test/state/95a7913c5380d1eb1c71fff9ce607045/chk-7/0315f986-b0b7-41f7-99f7-2d17a838129d'} 2017-06-20 18:54:17.982 [ip-10-150-96-53] DEBUG o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serialize 7 2017-06-20 18:54:17.998 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/95a7913c5380d1eb1c71fff9ce607045/chk-7/ed3a0477-503f-4a3f-9d60-6d7fbd2f3fb3 [208469835 bytes] 2017-06-20 18:54:17.998 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/95a7913c5380d1eb1c71fff9ce607045/chk-7/d93b6051-7494-40d8-975f-18032232ee22 [23438 bytes] 2017-06-20 18:54:17.998 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing org.apache.flink.runtime.state.PlaceholderStreamStateHandle@4c6d48f5 2017-06-20 18:54:38.575 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - deleting instance base path /media/flink/tmp0/flink-io-2ba7a991-77cf-421e-bbc8-05dbaa8cc93f/job-95a7913c5380d1eb1c71fff9ce607045_op-WindowOperator_44_0_uuid-d0369bd3-d35a-4ade-85df-c0cfd1f0ac43
        Show
        cresny@gmail.com Cliff Resnick added a comment - full log here: https://gist.github.com/cresny/d3c36896ce5692d7772979438c80944e
        Hide
        srichter Stefan Richter added a comment -

        Thanks for reporting this. First, let me clarify that some re-uploads of already existing sst files are expected to happen sometimes. This is the case when the previous checkpoint was not yet confirmed to the backend. In this case, the next checkpoint cannot reference sst files from the unconfirmed predecessor. However, in such a case the SharedStateRegistry will do a de-duplication with the original file if it actually got confirmed after all; in this case only the first registered copy of a sst files survives.

        Placeholder in the serializer are a true bug. I think it would be very helpful if you could log a bit more. In particular, all interactions with the SharedStateRegistry are relevant. Most importantly, when the externalized checkpoint is loaded and re-registered with the registry after restart. At all times, the registry should never contain placeholder but only real files, because part of its purpose is to replace placeholders with their originals. You could introduce a precondition for that and see if it is ever violated. This should be the case, because 000027.sst was detected as a new file to upload by the backend, so the only way it could become a placeholder is if - for any reason - a placeholder got registered and was mistakenly used for file de-duplication against a non-duplicate file (000027.sst). Can you provide a log that contains: triggered checkpoints, un/register interactions with the registry (inputs and result), completed checkpoints as received by the backend, the files that were written for the externalized checkpoints and the state of the shared registry after the restores? That would be very helpful to track this problem.

        Show
        srichter Stefan Richter added a comment - Thanks for reporting this. First, let me clarify that some re-uploads of already existing sst files are expected to happen sometimes. This is the case when the previous checkpoint was not yet confirmed to the backend. In this case, the next checkpoint cannot reference sst files from the unconfirmed predecessor. However, in such a case the SharedStateRegistry will do a de-duplication with the original file if it actually got confirmed after all; in this case only the first registered copy of a sst files survives. Placeholder in the serializer are a true bug. I think it would be very helpful if you could log a bit more. In particular, all interactions with the SharedStateRegistry are relevant. Most importantly, when the externalized checkpoint is loaded and re-registered with the registry after restart. At all times, the registry should never contain placeholder but only real files, because part of its purpose is to replace placeholders with their originals. You could introduce a precondition for that and see if it is ever violated. This should be the case, because 000027.sst was detected as a new file to upload by the backend, so the only way it could become a placeholder is if - for any reason - a placeholder got registered and was mistakenly used for file de-duplication against a non-duplicate file (000027.sst). Can you provide a log that contains: triggered checkpoints, un/register interactions with the registry (inputs and result), completed checkpoints as received by the backend, the files that were written for the externalized checkpoints and the state of the shared registry after the restores? That would be very helpful to track this problem.
        Hide
        cresny@gmail.com Cliff Resnick added a comment -

        Ok, new gist is here:
        https://gist.github.com/cresny/87c997b558064a7ec5e8021ae7456653

        This one cancels job after two checkpoints, first checkpoint after recovery fails with Placeholder serialization attempt.

        Show
        cresny@gmail.com Cliff Resnick added a comment - Ok, new gist is here: https://gist.github.com/cresny/87c997b558064a7ec5e8021ae7456653 This one cancels job after two checkpoints, first checkpoint after recovery fails with Placeholder serialization attempt.
        Hide
        cresny@gmail.com Cliff Resnick added a comment -

        Stefan, if you need me to unpack things further please feel free to add snippets here and I'll integrate them.

        Show
        cresny@gmail.com Cliff Resnick added a comment - Stefan, if you need me to unpack things further please feel free to add snippets here and I'll integrate them.
        Hide
        srichter Stefan Richter added a comment -

        Cliff Resnick, I think this log was already enough to figure out the problem. Good news is, the basic incremental checkpointing works as expected. Bad news is the combination of StandaloneCompletedCheckpointStore and externalized checkpoints is broken. StandaloneCompletedCheckpointStore does not recover completed checkpoints on restore and therefore does not re-register them. The fix will first add the checkpoint from which we restored to the StandaloneCompletedCheckpointStore, so that it can be registered as shared state again, and references from future checkpoints work.

        Show
        srichter Stefan Richter added a comment - Cliff Resnick , I think this log was already enough to figure out the problem. Good news is, the basic incremental checkpointing works as expected. Bad news is the combination of StandaloneCompletedCheckpointStore and externalized checkpoints is broken. StandaloneCompletedCheckpointStore does not recover completed checkpoints on restore and therefore does not re-register them. The fix will first add the checkpoint from which we restored to the StandaloneCompletedCheckpointStore , so that it can be registered as shared state again, and references from future checkpoints work.
        Show
        srichter Stefan Richter added a comment - I created https://issues.apache.org/jira/browse/FLINK-6964 .
        Hide
        srichter Stefan Richter added a comment -

        Cliff Resnick I think I have a fix for your problem in this branch: https://github.com/StefanRRichter/flink/tree/fixRecoverStandaloneCompeltedCheckpointStore. I will probably merge it at some point later this week.

        Show
        srichter Stefan Richter added a comment - Cliff Resnick I think I have a fix for your problem in this branch: https://github.com/StefanRRichter/flink/tree/fixRecoverStandaloneCompeltedCheckpointStore . I will probably merge it at some point later this week.
        Hide
        cresny@gmail.com Cliff Resnick added a comment -

        Thanks Stefan Richter, I'll give this a try tomorrow morning EST.

        Show
        cresny@gmail.com Cliff Resnick added a comment - Thanks Stefan Richter , I'll give this a try tomorrow morning EST.

          People

          • Assignee:
            srichter Stefan Richter
            Reporter:
            srichter Stefan Richter
          • Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development