Details
-
Bug
-
Status: Closed
-
Blocker
-
Resolution: Fixed
-
None
-
None
Description
Adding this test to StateBackendTestBase showcases the problem:
@Test public void testConcurrentModificationWithGetKeys() throws Exception { AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE); try { ListStateDescriptor<String> listStateDescriptor = new ListStateDescriptor<>("foo", StringSerializer.INSTANCE); backend.setCurrentKey(1); backend .getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, listStateDescriptor) .add("Hello"); backend.setCurrentKey(2); backend .getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, listStateDescriptor) .add("Ciao"); Stream<Integer> keys = backend .getKeys(listStateDescriptor.getName(), VoidNamespace.INSTANCE); keys.forEach((key) -> { backend.setCurrentKey(key); try { backend .getPartitionedState( VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, listStateDescriptor) .clear(); } catch (Exception e) { e.printStackTrace(); } }); } finally { IOUtils.closeQuietly(backend); backend.dispose(); } }
This should work because one of the use cases of getKeys() and applyToAllKeys() is to do stuff for every key, which includes deleting them.
Attachments
Issue Links
- links to