Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-9269

Concurrency problem in HeapKeyedStateBackend when performing checkpoint async

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Critical
    • Resolution: Fixed
    • 1.5.0
    • 1.6.0
    • None

    Description

      @Test
      public void testConccurrencyProblem() throws Exception {
      
      	CheckpointStreamFactory streamFactory = createStreamFactory();
      	Environment env = new DummyEnvironment();
      	AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env);
      
      	try {
      		long checkpointID = 0;
      		List<Future> futureList = new ArrayList();
      		for (int i = 0; i < 10; ++i) {
      			ValueStateDescriptor<Integer> kvId = new ValueStateDescriptor<>("id" + i, IntSerializer.INSTANCE);
      			ValueState<Integer> state = backend.getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, kvId);
      			((InternalValueState) state).setCurrentNamespace(VoidNamespace.INSTANCE);
      			backend.setCurrentKey(i);
      			state.update(i);
      
      			futureList.add(runSnapshotAsync(backend.snapshot(checkpointID++, System.currentTimeMillis(), streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation())));
      		}
      
      		for (Future future : futureList) {
      			future.get();
      		}
      	} catch (Exception e) {
      		fail();
      	} finally {
      		backend.dispose();
      	}
      }
      
      protected Future<?> runSnapshotAsync(
      	RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshotRunnableFuture) throws Exception {
      
      	if (!snapshotRunnableFuture.isDone()) {
      		return Executors.newFixedThreadPool(5).submit(() -> {
      			try {
      				snapshotRunnableFuture.run();
      				snapshotRunnableFuture.get();
      			} catch (Exception e) {
      				e.printStackTrace();
      				fail();
      			}
      		});
      	}
      	return null;
      }
      

      Place the above code in `StateBackendTestBase` and run `AsyncMemoryStateBackendTest`, it will get the follows exception

      java.util.concurrent.ExecutionException: java.lang.NullPointerException
      	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
      	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
      	at org.apache.flink.runtime.state.AsyncMemoryStateBackendTest.lambda$runSnapshotAsync$0(AsyncMemoryStateBackendTest.java:85)
      	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      	at java.lang.Thread.run(Thread.java:745)
      Caused by: java.lang.NullPointerException
      	at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.performOperation(HeapKeyedStateBackend.java:716)
      	at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.performOperation(HeapKeyedStateBackend.java:662)
      	at org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
      	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      	at org.apache.flink.runtime.state.AsyncMemoryStateBackendTest.lambda$runSnapshotAsync$0(AsyncMemoryStateBackendTest.java:84)
      	... 5 more
      java.util.concurrent.ExecutionException: java.lang.NullPointerException
      	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
      	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
      	at org.apache.flink.runtime.state.AsyncMemoryStateBackendTest.lambda$runSnapshotAsync$0(AsyncMemoryStateBackendTest.java:85)
      	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      	at java.lang.Thread.run(Thread.java:745)
      Caused by: java.lang.NullPointerException
      	at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.performOperation(HeapKeyedStateBackend.java:716)
      	at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.performOperation(HeapKeyedStateBackend.java:662)
      	at org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
      	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      	at org.apache.flink.runtime.state.AsyncMemoryStateBackendTest.lambda$runSnapshotAsync$0(AsyncMemoryStateBackendTest.java:84)
      	... 5 more
      

      Attachments

        Issue Links

          Activity

            People

              sihuazhou Sihua Zhou
              sihuazhou Sihua Zhou
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: