Details
-
Bug
-
Status: Closed
-
Blocker
-
Resolution: Fixed
-
1.5.0
Description
The test is currently @Ignored. We have to change AsyncCheckpointOperator to make sure that we can run fully asynchronously. Then, the test will still fail because the canceling behaviour was changed in the meantime.
public static class AsyncCheckpointOperator extends AbstractStreamOperator<String> implements OneInputStreamOperator<String, String> { @Override public void open() throws Exception { super.open(); // also get the state in open, this way we are sure that it was created before // we trigger the test checkpoint ValueState<String> state = getPartitionedState( VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, new ValueStateDescriptor<>("count", StringSerializer.INSTANCE, "hello")); } @Override public void processElement(StreamRecord<String> element) throws Exception { // we also don't care ValueState<String> state = getPartitionedState( VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, new ValueStateDescriptor<>("count", StringSerializer.INSTANCE, "hello")); state.update(element.getValue()); } @Override public void snapshotState(StateSnapshotContext context) throws Exception { // do nothing so that we don't block } }
Attachments
Issue Links
- links to