Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-3466

Job might get stuck in restoreState() from HDFS due to interrupt

Details

    • Bug
    • Status: Closed
    • Blocker
    • Resolution: Fixed
    • 1.1.0
    • 1.1.0
    • None

    Description

      A user reported the following issue with a failing job:

      10:46:09,223 WARN  org.apache.flink.runtime.taskmanager.Task                     - Task 'XXX -> YYY (3/5)' did not react to cancelling signal, but is stuck in method:
      sun.misc.Unsafe.park(Native Method)
      java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
      java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitUninterruptibly(AbstractQueuedSynchronizer.java:1979)
      org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager$EndpointShmManager.allocSlot(DfsClientShmManager.java:255)
      org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.allocSlot(DfsClientShmManager.java:434)
      org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.allocShmSlot(ShortCircuitCache.java:1016)
      org.apache.hadoop.hdfs.BlockReaderFactory.createShortCircuitReplicaInfo(BlockReaderFactory.java:477)
      org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.create(ShortCircuitCache.java:783)
      org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.fetchOrCreate(ShortCircuitCache.java:717)
      org.apache.hadoop.hdfs.BlockReaderFactory.getBlockReaderLocal(BlockReaderFactory.java:421)
      org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:332)
      org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:576)
      org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:800)
      org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:848)
      java.io.DataInputStream.read(DataInputStream.java:149)
      org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:69)
      java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2310)
      java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2323)
      java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:2794)
      java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:801)
      java.io.ObjectInputStream.<init>(ObjectInputStream.java:299)
      org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.<init>(InstantiationUtil.java:55)
      org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.getState(FileSerializableStateHandle.java:52)
      org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.getState(FileSerializableStateHandle.java:35)
      org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:162)
      org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:440)
      org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:208)
      org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
      java.lang.Thread.run(Thread.java:745)
      

      and

      10:46:09,223 WARN  org.apache.flink.runtime.taskmanager.Task                     - Task 'XXX -> YYY (3/5)' did not react to cancelling signal, but is stuck in method:
      java.lang.Throwable.fillInStackTrace(Native Method)
      java.lang.Throwable.fillInStackTrace(Throwable.java:783)
      java.lang.Throwable.<init>(Throwable.java:250)
      java.lang.Exception.<init>(Exception.java:54)
      java.lang.InterruptedException.<init>(InterruptedException.java:57)
      java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2038)
      org.apache.hadoop.net.unix.DomainSocketWatcher.add(DomainSocketWatcher.java:325)
      org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager$EndpointShmManager.allocSlot(DfsClientShmManager.java:266)
      org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.allocSlot(DfsClientShmManager.java:434)
      org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.allocShmSlot(ShortCircuitCache.java:1016)
      org.apache.hadoop.hdfs.BlockReaderFactory.createShortCircuitReplicaInfo(BlockReaderFactory.java:477)
      org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.create(ShortCircuitCache.java:783)
      org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.fetchOrCreate(ShortCircuitCache.java:717)
      org.apache.hadoop.hdfs.BlockReaderFactory.getBlockReaderLocal(BlockReaderFactory.java:421)
      org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:332)
      org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:576)
      org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:800)
      org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:848)
      java.io.DataInputStream.read(DataInputStream.java:149)
      org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:69)
      java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2310)
      java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2323)
      java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:2794)
      java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:801)
      java.io.ObjectInputStream.<init>(ObjectInputStream.java:299)
      org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.<init>(InstantiationUtil.java:55)
      org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.getState(FileSerializableStateHandle.java:52)
      org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.getState(FileSerializableStateHandle.java:35)
      org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:162)
      org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:440)
      org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:208)
      org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
      java.lang.Thread.run(Thread.java:745)
      

      The issue is most likely that the HDFS client gets stuck in the "org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read()" call when it receives an interrupt.
      By putting the call into a separate thread, the TaskInterrupt would not break the HadoopReadThread.

      The HadoopReadThread would stop eventually with an error or after the read operation has finished.

      Attachments

        Issue Links

          Activity

            sewen Stephan Ewen added a comment -

            Here is a Unit test that minimally reproduces getting stuck in interrupt sensitive state handles (like those reading from HDFS)

            public class InterruptSensitiveRestoreTest {
            
            	private static final OneShotLatch IN_RESTORE_LATCH = new OneShotLatch();
            
            	@Test
            	public void testRestoreWithInterrupt() throws Exception {
            
            		Configuration taskConfig = new Configuration();
            		StreamConfig cfg = new StreamConfig(taskConfig);
            		cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
            
            		TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(taskConfig, new InterruptLockingStateHandle());
            		Task task = createTask(tdd);
            
            		// start the task and wait until it is in "restore"
            		task.startTaskThread();
            		IN_RESTORE_LATCH.await();
            
            		// trigger cancellation and signal to continue
            		task.cancelExecution();
            
            		task.getExecutingThread().join(30000);
            
            		if (task.getExecutionState() == ExecutionState.CANCELING) {
            			fail("Task is stuck and not canceling");
            		}
            
            		assertEquals(ExecutionState.CANCELED, task.getExecutionState());
            		assertNull(task.getFailureCause());
            	}
            
            	// ------------------------------------------------------------------------
            	//  Utilities
            	// ------------------------------------------------------------------------
            
            	private static TaskDeploymentDescriptor createTaskDeploymentDescriptor(
            			Configuration taskConfig,
            			StateHandle<?> state) throws IOException {
            		return new TaskDeploymentDescriptor(
            				new JobID(),
            				"test job name",
            				new JobVertexID(),
            				new ExecutionAttemptID(),
            				new SerializedValue<>(new ExecutionConfig()),
            				"test task name",
            				0, 1, 0,
            				new Configuration(),
            				taskConfig,
            				SourceStreamTask.class.getName(),
            				Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
            				Collections.<InputGateDeploymentDescriptor>emptyList(),
            				Collections.<BlobKey>emptyList(),
            				Collections.<URL>emptyList(),
            				0,
            				new SerializedValue<StateHandle<?>>(state));
            	}
            	
            	private static Task createTask(TaskDeploymentDescriptor tdd) throws IOException {
            		return new Task(
            				tdd,
            				mock(MemoryManager.class),
            				mock(IOManager.class),
            				mock(NetworkEnvironment.class),
            				mock(BroadcastVariableManager.class),
            				mock(ActorGateway.class),
            				mock(ActorGateway.class),
            				new FiniteDuration(10, TimeUnit.SECONDS),
            				new FallbackLibraryCacheManager(),
            				new FileCache(new Configuration()),
            				new TaskManagerRuntimeInfo(
            						"localhost", new Configuration(), EnvironmentInformation.getTemporaryFileDirectory()),
            				mock(TaskMetricGroup.class));
            		
            	}
            
            	@SuppressWarnings("serial")
            	private static class InterruptLockingStateHandle extends StreamTaskStateList {
            
            		public InterruptLockingStateHandle() throws Exception {
            			super(new StreamTaskState[0]);
            		}
            
            		@Override
            		public StreamTaskState[] getState(ClassLoader userCodeClassLoader) {
            			IN_RESTORE_LATCH.trigger();
            			
            			// this mimics what happens in the HDFS client code.
            			// an interrupt on a waiting object leads to an infinite loop
            			try {
            				synchronized (this) {
            					wait();
            				}
            			}
            			catch (InterruptedException e) {
            				while (true) {
            					try {
            						synchronized (this) {
            							wait();
            						}
            					} catch (InterruptedException ignored) {}
            				}
            			}
            			
            			return super.getState(userCodeClassLoader);
            		}
            	}
            }
            
            sewen Stephan Ewen added a comment - Here is a Unit test that minimally reproduces getting stuck in interrupt sensitive state handles (like those reading from HDFS) public class InterruptSensitiveRestoreTest { private static final OneShotLatch IN_RESTORE_LATCH = new OneShotLatch(); @Test public void testRestoreWithInterrupt() throws Exception { Configuration taskConfig = new Configuration(); StreamConfig cfg = new StreamConfig(taskConfig); cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(taskConfig, new InterruptLockingStateHandle()); Task task = createTask(tdd); // start the task and wait until it is in "restore" task.startTaskThread(); IN_RESTORE_LATCH.await(); // trigger cancellation and signal to continue task.cancelExecution(); task.getExecutingThread().join(30000); if (task.getExecutionState() == ExecutionState.CANCELING) { fail( "Task is stuck and not canceling" ); } assertEquals(ExecutionState.CANCELED, task.getExecutionState()); assertNull(task.getFailureCause()); } // ------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------ private static TaskDeploymentDescriptor createTaskDeploymentDescriptor( Configuration taskConfig, StateHandle<?> state) throws IOException { return new TaskDeploymentDescriptor( new JobID(), "test job name" , new JobVertexID(), new ExecutionAttemptID(), new SerializedValue<>( new ExecutionConfig()), "test task name" , 0, 1, 0, new Configuration(), taskConfig, SourceStreamTask. class. getName(), Collections.<ResultPartitionDeploymentDescriptor>emptyList(), Collections.<InputGateDeploymentDescriptor>emptyList(), Collections.<BlobKey>emptyList(), Collections.<URL>emptyList(), 0, new SerializedValue<StateHandle<?>>(state)); } private static Task createTask(TaskDeploymentDescriptor tdd) throws IOException { return new Task( tdd, mock(MemoryManager.class), mock(IOManager.class), mock(NetworkEnvironment.class), mock(BroadcastVariableManager.class), mock(ActorGateway.class), mock(ActorGateway.class), new FiniteDuration(10, TimeUnit.SECONDS), new FallbackLibraryCacheManager(), new FileCache( new Configuration()), new TaskManagerRuntimeInfo( "localhost" , new Configuration(), EnvironmentInformation.getTemporaryFileDirectory()), mock(TaskMetricGroup.class)); } @SuppressWarnings( "serial" ) private static class InterruptLockingStateHandle extends StreamTaskStateList { public InterruptLockingStateHandle() throws Exception { super ( new StreamTaskState[0]); } @Override public StreamTaskState[] getState( ClassLoader userCodeClassLoader) { IN_RESTORE_LATCH.trigger(); // this mimics what happens in the HDFS client code. // an interrupt on a waiting object leads to an infinite loop try { synchronized ( this ) { wait(); } } catch (InterruptedException e) { while ( true ) { try { synchronized ( this ) { wait(); } } catch (InterruptedException ignored) {} } } return super .getState(userCodeClassLoader); } } }
            githubbot ASF GitHub Bot added a comment -

            GitHub user StephanEwen opened a pull request:

            https://github.com/apache/flink/pull/2252

            FLINK-3466 [runtime] Cancel state handled on state restore

            This pull request fixes the issue that state restore operations can get stuck when tasks are cancelled during state restore. That happens due to a bug in HDFS, which deadlocks (or livelocks) when the reading thread is interrupted.

            This introduces two things:

            1. All state handles and key/value snapshots are now `Closable`. This does not delete any checkpoint data, but simply closes pending streams and data fetch handles. Operations concurrently accessing the state handles state should fail.

            2. The `StreamTask` holds a set of "Closables" that it closes upon cancellation. This is a cleaner way of stopping in-progress work than relying on "interrupt()" to interrupt that work.

            This mechanism should eventually be extended to also cancel operators and state handles pending asynchronous materialization.

            There is a test that has an interrupt sensitive state handle (mimicking HDFS's deadlock behavior) that causes a stall without this pull request and cleanly finishes with the changes in this pull request.

            You can merge this pull request into a Git repository by running:

            $ git pull https://github.com/StephanEwen/incubator-flink state_handle_cancellation

            Alternatively you can review and apply these changes as the patch at:

            https://github.com/apache/flink/pull/2252.patch

            To close this pull request, make a commit to your master/trunk branch
            with (at least) the following in the commit message:

            This closes #2252


            commit 224503b86c2864f604a7c519ea5f415c57f35ff3
            Author: Stephan Ewen <sewen@apache.org>
            Date: 2016-07-14T13:14:12Z

            FLINK-3466 [tests] Add serialization validation for state handles

            commit c411b379381ab1390e2166356232a33165c1abd9
            Author: Stephan Ewen <sewen@apache.org>
            Date: 2016-07-13T19:32:40Z

            FLINK-3466 [runtime] Make state handles cancelable.

            State handles are cancelable, to make sure long running checkpoint restore operations do
            finish early on cancallation, even if the code does not properly react to interrupts.

            This is especially important since HDFS client code is so buggy that it deadlocks when
            interrupted without closing.


            githubbot ASF GitHub Bot added a comment - GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/2252 FLINK-3466 [runtime] Cancel state handled on state restore This pull request fixes the issue that state restore operations can get stuck when tasks are cancelled during state restore. That happens due to a bug in HDFS, which deadlocks (or livelocks) when the reading thread is interrupted. This introduces two things: 1. All state handles and key/value snapshots are now `Closable`. This does not delete any checkpoint data, but simply closes pending streams and data fetch handles. Operations concurrently accessing the state handles state should fail. 2. The `StreamTask` holds a set of "Closables" that it closes upon cancellation. This is a cleaner way of stopping in-progress work than relying on "interrupt()" to interrupt that work. This mechanism should eventually be extended to also cancel operators and state handles pending asynchronous materialization. There is a test that has an interrupt sensitive state handle (mimicking HDFS's deadlock behavior) that causes a stall without this pull request and cleanly finishes with the changes in this pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink state_handle_cancellation Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2252.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2252 commit 224503b86c2864f604a7c519ea5f415c57f35ff3 Author: Stephan Ewen <sewen@apache.org> Date: 2016-07-14T13:14:12Z FLINK-3466 [tests] Add serialization validation for state handles commit c411b379381ab1390e2166356232a33165c1abd9 Author: Stephan Ewen <sewen@apache.org> Date: 2016-07-13T19:32:40Z FLINK-3466 [runtime] Make state handles cancelable. State handles are cancelable, to make sure long running checkpoint restore operations do finish early on cancallation, even if the code does not properly react to interrupts. This is especially important since HDFS client code is so buggy that it deadlocks when interrupted without closing.
            githubbot ASF GitHub Bot added a comment -

            Github user uce commented on a diff in the pull request:

            https://github.com/apache/flink/pull/2252#discussion_r70943314

            — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractCloseableHandle.java —
            @@ -0,0 +1,131 @@
            +/*
            + * Licensed to the Apache Software Foundation (ASF) under one
            + * or more contributor license agreements. See the NOTICE file
            + * distributed with this work for additional information
            + * regarding copyright ownership. The ASF licenses this file
            + * to you under the Apache License, Version 2.0 (the
            + * "License"); you may not use this file except in compliance
            + * with the License. You may obtain a copy of the License at
            + *
            + * http://www.apache.org/licenses/LICENSE-2.0
            + *
            + * Unless required by applicable law or agreed to in writing, software
            + * distributed under the License is distributed on an "AS IS" BASIS,
            + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
            + * See the License for the specific language governing permissions and
            + * limitations under the License.
            + */
            +
            +package org.apache.flink.runtime.state;
            +
            +import java.io.Closeable;
            +import java.io.IOException;
            +import java.io.Serializable;
            +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
            +
            +/**
            + * A simple base for closable handles.
            + *
            + * Offers to register a stream (or other closable object) that close calls are delegated to if
            + * the handel is closed or was already closed.
            — End diff –

            typo: handel => handle

            githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2252#discussion_r70943314 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractCloseableHandle.java — @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import java.io.Closeable; +import java.io.IOException; +import java.io.Serializable; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; + +/** + * A simple base for closable handles. + * + * Offers to register a stream (or other closable object) that close calls are delegated to if + * the handel is closed or was already closed. — End diff – typo: handel => handle
            githubbot ASF GitHub Bot added a comment -

            Github user uce commented on a diff in the pull request:

            https://github.com/apache/flink/pull/2252#discussion_r70944104

            — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateHandle.java —
            @@ -20,18 +20,21 @@

            import org.apache.flink.core.fs.FileSystem;
            import org.apache.flink.core.fs.Path;
            +import org.apache.flink.runtime.state.AbstractCloseableHandle;
            +import org.apache.flink.runtime.state.StateObject;

            import java.io.IOException;
            +import java.io.Serializable;

            import static java.util.Objects.requireNonNull;

            /**

            • Base class for state that is stored in a file.
              */
              -public abstract class AbstractFileStateHandle implements java.io.Serializable {
            • +public abstract class AbstractFileStateHandle extends AbstractCloseableHandle implements StateObject, Serializable {

                • End diff –

            Makes no difference, but we can remove the `Serializable` here

            githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2252#discussion_r70944104 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateHandle.java — @@ -20,18 +20,21 @@ import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.AbstractCloseableHandle; +import org.apache.flink.runtime.state.StateObject; import java.io.IOException; +import java.io.Serializable; import static java.util.Objects.requireNonNull; /** Base class for state that is stored in a file. */ -public abstract class AbstractFileStateHandle implements java.io.Serializable { +public abstract class AbstractFileStateHandle extends AbstractCloseableHandle implements StateObject, Serializable { End diff – Makes no difference, but we can remove the `Serializable` here
            githubbot ASF GitHub Bot added a comment -

            Github user uce commented on a diff in the pull request:

            https://github.com/apache/flink/pull/2252#discussion_r70946219

            — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java —
            @@ -0,0 +1,223 @@
            +/*
            + * Licensed to the Apache Software Foundation (ASF) under one
            + * or more contributor license agreements. See the NOTICE file
            + * distributed with this work for additional information
            + * regarding copyright ownership. The ASF licenses this file
            + * to you under the Apache License, Version 2.0 (the
            + * "License"); you may not use this file except in compliance
            + * with the License. You may obtain a copy of the License at
            + *
            + * http://www.apache.org/licenses/LICENSE-2.0
            + *
            + * Unless required by applicable law or agreed to in writing, software
            + * distributed under the License is distributed on an "AS IS" BASIS,
            + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
            + * See the License for the specific language governing permissions and
            + * limitations under the License.
            + */
            +
            +package org.apache.flink.streaming.runtime.tasks;
            +
            +import org.apache.flink.api.common.ExecutionConfig;
            +import org.apache.flink.api.common.JobID;
            +import org.apache.flink.configuration.Configuration;
            +import org.apache.flink.core.testutils.OneShotLatch;
            +import org.apache.flink.runtime.blob.BlobKey;
            +import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
            +import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
            +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
            +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
            +import org.apache.flink.runtime.execution.ExecutionState;
            +import org.apache.flink.runtime.execution.librarycache.FallbackLibraryCacheManager;
            +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
            +import org.apache.flink.runtime.filecache.FileCache;
            +import org.apache.flink.runtime.instance.ActorGateway;
            +import org.apache.flink.runtime.io.disk.iomanager.IOManager;
            +import org.apache.flink.runtime.io.network.NetworkEnvironment;
            +import org.apache.flink.runtime.jobgraph.JobVertexID;
            +import org.apache.flink.runtime.memory.MemoryManager;
            +import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
            +import org.apache.flink.runtime.state.StateHandle;
            +import org.apache.flink.runtime.taskmanager.Task;
            +import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
            +import org.apache.flink.runtime.util.EnvironmentInformation;
            +import org.apache.flink.runtime.util.SerializableObject;
            +import org.apache.flink.streaming.api.TimeCharacteristic;
            +import org.apache.flink.streaming.api.checkpoint.Checkpointed;
            +import org.apache.flink.streaming.api.functions.source.SourceFunction;
            +import org.apache.flink.streaming.api.graph.StreamConfig;
            +import org.apache.flink.streaming.api.operators.StreamSource;
            +import org.apache.flink.util.SerializedValue;
            +
            +import org.junit.Test;
            +
            +import scala.concurrent.duration.FiniteDuration;
            +
            +import java.io.IOException;
            +import java.io.Serializable;
            +import java.net.URL;
            +import java.util.Collections;
            +import java.util.concurrent.TimeUnit;
            +
            +import static org.junit.Assert.*;
            +import static org.mockito.Mockito.*;
            +
            +/**
            + * This test checks that task restores that get stuck in the presence of interrupts
            + * are handled properly.
            + *
            + * In practice, reading from HDFS is interrupt sensitive: The HDFS code frequently deadlocks
            + * or livelocks if it is interrupted.
            + */
            +public class InterruptSensitiveRestoreTest {
            +
            + private static final OneShotLatch IN_RESTORE_LATCH = new OneShotLatch();
            +
            + @Test
            + public void testRestoreWithInterrupt() throws Exception {
            +
            + Configuration taskConfig = new Configuration();
            + StreamConfig cfg = new StreamConfig(taskConfig);
            + cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
            + cfg.setStreamOperator(new StreamSource<>(new TestSource()));
            +
            + StateHandle<Serializable> lockingHandle = new InterruptLockingStateHandle();
            + StreamTaskState opState = new StreamTaskState();
            + opState.setFunctionState(lockingHandle);
            + StreamTaskStateList taskState = new StreamTaskStateList(new StreamTaskState[]

            { opState }

            );
            +
            + TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(taskConfig, taskState);
            + Task task = createTask(tdd);
            +
            + // start the task and wait until it is in "restore"
            + task.startTaskThread();
            + IN_RESTORE_LATCH.await();
            +
            + // trigger cancellation and signal to continue
            + task.cancelExecution();
            +
            + task.getExecutingThread().join(30000);
            — End diff –

            I think there is a race in the test: if the task is interrupted before entering the first wait, it will just wait there until the 30 seconds are over and the test will fail.

            githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2252#discussion_r70946219 — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java — @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.broadcast.BroadcastVariableManager; +import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.execution.librarycache.FallbackLibraryCacheManager; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.network.NetworkEnvironment; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.taskmanager.Task; +import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.SerializableObject; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.StreamSource; +import org.apache.flink.util.SerializedValue; + +import org.junit.Test; + +import scala.concurrent.duration.FiniteDuration; + +import java.io.IOException; +import java.io.Serializable; +import java.net.URL; +import java.util.Collections; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +/** + * This test checks that task restores that get stuck in the presence of interrupts + * are handled properly. + * + * In practice, reading from HDFS is interrupt sensitive: The HDFS code frequently deadlocks + * or livelocks if it is interrupted. + */ +public class InterruptSensitiveRestoreTest { + + private static final OneShotLatch IN_RESTORE_LATCH = new OneShotLatch(); + + @Test + public void testRestoreWithInterrupt() throws Exception { + + Configuration taskConfig = new Configuration(); + StreamConfig cfg = new StreamConfig(taskConfig); + cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); + cfg.setStreamOperator(new StreamSource<>(new TestSource())); + + StateHandle<Serializable> lockingHandle = new InterruptLockingStateHandle(); + StreamTaskState opState = new StreamTaskState(); + opState.setFunctionState(lockingHandle); + StreamTaskStateList taskState = new StreamTaskStateList(new StreamTaskState[] { opState } ); + + TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(taskConfig, taskState); + Task task = createTask(tdd); + + // start the task and wait until it is in "restore" + task.startTaskThread(); + IN_RESTORE_LATCH.await(); + + // trigger cancellation and signal to continue + task.cancelExecution(); + + task.getExecutingThread().join(30000); — End diff – I think there is a race in the test: if the task is interrupted before entering the first wait, it will just wait there until the 30 seconds are over and the test will fail.
            githubbot ASF GitHub Bot added a comment -

            Github user uce commented on a diff in the pull request:

            https://github.com/apache/flink/pull/2252#discussion_r70946878

            — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemStateSnapshot.java —
            @@ -54,6 +56,8 @@

            /** The serialized data of the state key/value pairs */
            private final byte[] data;
            +
            + private transient boolean closed;
            — End diff –

            Missing `volatile`?

            githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2252#discussion_r70946878 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemStateSnapshot.java — @@ -54,6 +56,8 @@ /** The serialized data of the state key/value pairs */ private final byte[] data; + + private transient boolean closed; — End diff – Missing `volatile`?
            githubbot ASF GitHub Bot added a comment -

            Github user uce commented on the issue:

            https://github.com/apache/flink/pull/2252

            Looks very good! The test failures seem unrelated:

            The added tests and refactorings are very readable.

            I think this is good to merge mod some minor inline comments.

            githubbot ASF GitHub Bot added a comment - Github user uce commented on the issue: https://github.com/apache/flink/pull/2252 Looks very good! The test failures seem unrelated: ClientTest: https://issues.apache.org/jira/browse/FLINK-4220 (newly created) JobManagerHACheckpointRecoveryITCase: https://issues.apache.org/jira/browse/FLINK-3516 (known instability) Travis Scala dependency issue The added tests and refactorings are very readable. I think this is good to merge mod some minor inline comments.
            githubbot ASF GitHub Bot added a comment -

            Github user StephanEwen commented on a diff in the pull request:

            https://github.com/apache/flink/pull/2252#discussion_r70956526

            — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemStateSnapshot.java —
            @@ -54,6 +56,8 @@

            /** The serialized data of the state key/value pairs */
            private final byte[] data;
            +
            + private transient boolean closed;
            — End diff –

            I think it is not crucial to have a strict barrier here. If the reading thread eventually notices the flag, it is enough. And since volatile accesses are much more expensive, I wanted to avoid that.

            githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2252#discussion_r70956526 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemStateSnapshot.java — @@ -54,6 +56,8 @@ /** The serialized data of the state key/value pairs */ private final byte[] data; + + private transient boolean closed; — End diff – I think it is not crucial to have a strict barrier here. If the reading thread eventually notices the flag, it is enough. And since volatile accesses are much more expensive, I wanted to avoid that.
            githubbot ASF GitHub Bot added a comment -

            Github user StephanEwen commented on a diff in the pull request:

            https://github.com/apache/flink/pull/2252#discussion_r70956813

            — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java —
            @@ -0,0 +1,223 @@
            +/*
            + * Licensed to the Apache Software Foundation (ASF) under one
            + * or more contributor license agreements. See the NOTICE file
            + * distributed with this work for additional information
            + * regarding copyright ownership. The ASF licenses this file
            + * to you under the Apache License, Version 2.0 (the
            + * "License"); you may not use this file except in compliance
            + * with the License. You may obtain a copy of the License at
            + *
            + * http://www.apache.org/licenses/LICENSE-2.0
            + *
            + * Unless required by applicable law or agreed to in writing, software
            + * distributed under the License is distributed on an "AS IS" BASIS,
            + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
            + * See the License for the specific language governing permissions and
            + * limitations under the License.
            + */
            +
            +package org.apache.flink.streaming.runtime.tasks;
            +
            +import org.apache.flink.api.common.ExecutionConfig;
            +import org.apache.flink.api.common.JobID;
            +import org.apache.flink.configuration.Configuration;
            +import org.apache.flink.core.testutils.OneShotLatch;
            +import org.apache.flink.runtime.blob.BlobKey;
            +import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
            +import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
            +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
            +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
            +import org.apache.flink.runtime.execution.ExecutionState;
            +import org.apache.flink.runtime.execution.librarycache.FallbackLibraryCacheManager;
            +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
            +import org.apache.flink.runtime.filecache.FileCache;
            +import org.apache.flink.runtime.instance.ActorGateway;
            +import org.apache.flink.runtime.io.disk.iomanager.IOManager;
            +import org.apache.flink.runtime.io.network.NetworkEnvironment;
            +import org.apache.flink.runtime.jobgraph.JobVertexID;
            +import org.apache.flink.runtime.memory.MemoryManager;
            +import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
            +import org.apache.flink.runtime.state.StateHandle;
            +import org.apache.flink.runtime.taskmanager.Task;
            +import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
            +import org.apache.flink.runtime.util.EnvironmentInformation;
            +import org.apache.flink.runtime.util.SerializableObject;
            +import org.apache.flink.streaming.api.TimeCharacteristic;
            +import org.apache.flink.streaming.api.checkpoint.Checkpointed;
            +import org.apache.flink.streaming.api.functions.source.SourceFunction;
            +import org.apache.flink.streaming.api.graph.StreamConfig;
            +import org.apache.flink.streaming.api.operators.StreamSource;
            +import org.apache.flink.util.SerializedValue;
            +
            +import org.junit.Test;
            +
            +import scala.concurrent.duration.FiniteDuration;
            +
            +import java.io.IOException;
            +import java.io.Serializable;
            +import java.net.URL;
            +import java.util.Collections;
            +import java.util.concurrent.TimeUnit;
            +
            +import static org.junit.Assert.*;
            +import static org.mockito.Mockito.*;
            +
            +/**
            + * This test checks that task restores that get stuck in the presence of interrupts
            + * are handled properly.
            + *
            + * In practice, reading from HDFS is interrupt sensitive: The HDFS code frequently deadlocks
            + * or livelocks if it is interrupted.
            + */
            +public class InterruptSensitiveRestoreTest {
            +
            + private static final OneShotLatch IN_RESTORE_LATCH = new OneShotLatch();
            +
            + @Test
            + public void testRestoreWithInterrupt() throws Exception {
            +
            + Configuration taskConfig = new Configuration();
            + StreamConfig cfg = new StreamConfig(taskConfig);
            + cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
            + cfg.setStreamOperator(new StreamSource<>(new TestSource()));
            +
            + StateHandle<Serializable> lockingHandle = new InterruptLockingStateHandle();
            + StreamTaskState opState = new StreamTaskState();
            + opState.setFunctionState(lockingHandle);
            + StreamTaskStateList taskState = new StreamTaskStateList(new StreamTaskState[]

            { opState }

            );
            +
            + TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(taskConfig, taskState);
            + Task task = createTask(tdd);
            +
            + // start the task and wait until it is in "restore"
            + task.startTaskThread();
            + IN_RESTORE_LATCH.await();
            +
            + // trigger cancellation and signal to continue
            + task.cancelExecution();
            +
            + task.getExecutingThread().join(30000);
            — End diff –

            I think it should not be a race. On interruption, the thread's 'interrupt' flag will be set and, upon entering `wait()`, it should immediately throw an `InterruptedException`.

            Also, the cancellation sends periodic interrupts.

            githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2252#discussion_r70956813 — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java — @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.broadcast.BroadcastVariableManager; +import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.execution.librarycache.FallbackLibraryCacheManager; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.network.NetworkEnvironment; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.taskmanager.Task; +import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.SerializableObject; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.StreamSource; +import org.apache.flink.util.SerializedValue; + +import org.junit.Test; + +import scala.concurrent.duration.FiniteDuration; + +import java.io.IOException; +import java.io.Serializable; +import java.net.URL; +import java.util.Collections; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +/** + * This test checks that task restores that get stuck in the presence of interrupts + * are handled properly. + * + * In practice, reading from HDFS is interrupt sensitive: The HDFS code frequently deadlocks + * or livelocks if it is interrupted. + */ +public class InterruptSensitiveRestoreTest { + + private static final OneShotLatch IN_RESTORE_LATCH = new OneShotLatch(); + + @Test + public void testRestoreWithInterrupt() throws Exception { + + Configuration taskConfig = new Configuration(); + StreamConfig cfg = new StreamConfig(taskConfig); + cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); + cfg.setStreamOperator(new StreamSource<>(new TestSource())); + + StateHandle<Serializable> lockingHandle = new InterruptLockingStateHandle(); + StreamTaskState opState = new StreamTaskState(); + opState.setFunctionState(lockingHandle); + StreamTaskStateList taskState = new StreamTaskStateList(new StreamTaskState[] { opState } ); + + TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(taskConfig, taskState); + Task task = createTask(tdd); + + // start the task and wait until it is in "restore" + task.startTaskThread(); + IN_RESTORE_LATCH.await(); + + // trigger cancellation and signal to continue + task.cancelExecution(); + + task.getExecutingThread().join(30000); — End diff – I think it should not be a race. On interruption, the thread's 'interrupt' flag will be set and, upon entering `wait()`, it should immediately throw an `InterruptedException`. Also, the cancellation sends periodic interrupts.
            githubbot ASF GitHub Bot added a comment -

            Github user StephanEwen commented on a diff in the pull request:

            https://github.com/apache/flink/pull/2252#discussion_r70956828

            — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractCloseableHandle.java —
            @@ -0,0 +1,131 @@
            +/*
            + * Licensed to the Apache Software Foundation (ASF) under one
            + * or more contributor license agreements. See the NOTICE file
            + * distributed with this work for additional information
            + * regarding copyright ownership. The ASF licenses this file
            + * to you under the Apache License, Version 2.0 (the
            + * "License"); you may not use this file except in compliance
            + * with the License. You may obtain a copy of the License at
            + *
            + * http://www.apache.org/licenses/LICENSE-2.0
            + *
            + * Unless required by applicable law or agreed to in writing, software
            + * distributed under the License is distributed on an "AS IS" BASIS,
            + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
            + * See the License for the specific language governing permissions and
            + * limitations under the License.
            + */
            +
            +package org.apache.flink.runtime.state;
            +
            +import java.io.Closeable;
            +import java.io.IOException;
            +import java.io.Serializable;
            +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
            +
            +/**
            + * A simple base for closable handles.
            + *
            + * Offers to register a stream (or other closable object) that close calls are delegated to if
            + * the handel is closed or was already closed.
            — End diff –

            Thanks, will fix it.

            githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2252#discussion_r70956828 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractCloseableHandle.java — @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import java.io.Closeable; +import java.io.IOException; +import java.io.Serializable; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; + +/** + * A simple base for closable handles. + * + * Offers to register a stream (or other closable object) that close calls are delegated to if + * the handel is closed or was already closed. — End diff – Thanks, will fix it.
            githubbot ASF GitHub Bot added a comment -

            Github user StephanEwen commented on a diff in the pull request:

            https://github.com/apache/flink/pull/2252#discussion_r70956861

            — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateHandle.java —
            @@ -20,18 +20,21 @@

            import org.apache.flink.core.fs.FileSystem;
            import org.apache.flink.core.fs.Path;
            +import org.apache.flink.runtime.state.AbstractCloseableHandle;
            +import org.apache.flink.runtime.state.StateObject;

            import java.io.IOException;
            +import java.io.Serializable;

            import static java.util.Objects.requireNonNull;

            /**

            • Base class for state that is stored in a file.
              */
              -public abstract class AbstractFileStateHandle implements java.io.Serializable {
            • +public abstract class AbstractFileStateHandle extends AbstractCloseableHandle implements StateObject, Serializable {

                • End diff –

            True, will remove this.

            githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2252#discussion_r70956861 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateHandle.java — @@ -20,18 +20,21 @@ import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.AbstractCloseableHandle; +import org.apache.flink.runtime.state.StateObject; import java.io.IOException; +import java.io.Serializable; import static java.util.Objects.requireNonNull; /** Base class for state that is stored in a file. */ -public abstract class AbstractFileStateHandle implements java.io.Serializable { +public abstract class AbstractFileStateHandle extends AbstractCloseableHandle implements StateObject, Serializable { End diff – True, will remove this.
            githubbot ASF GitHub Bot added a comment -

            Github user StephanEwen commented on the issue:

            https://github.com/apache/flink/pull/2252

            Thanks, I'll address your comments and merge this...

            githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2252 Thanks, I'll address your comments and merge this...
            githubbot ASF GitHub Bot added a comment -

            Github user StephanEwen commented on the issue:

            https://github.com/apache/flink/pull/2252

            Manually merged in e9f660d1ff5540c7ef829f2de5bb870b787c18b7

            githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2252 Manually merged in e9f660d1ff5540c7ef829f2de5bb870b787c18b7
            githubbot ASF GitHub Bot added a comment -

            Github user StephanEwen closed the pull request at:

            https://github.com/apache/flink/pull/2252

            githubbot ASF GitHub Bot added a comment - Github user StephanEwen closed the pull request at: https://github.com/apache/flink/pull/2252
            sewen Stephan Ewen added a comment -

            Fixed in e9f660d1ff5540c7ef829f2de5bb870b787c18b7

            sewen Stephan Ewen added a comment - Fixed in e9f660d1ff5540c7ef829f2de5bb870b787c18b7
            githubbot ASF GitHub Bot added a comment -

            Github user liuml07 commented on the issue:

            https://github.com/apache/flink/pull/2252

            Is this related to https://issues.apache.org/jira/browse/HADOOP-14214? Thanks,

            githubbot ASF GitHub Bot added a comment - Github user liuml07 commented on the issue: https://github.com/apache/flink/pull/2252 Is this related to https://issues.apache.org/jira/browse/HADOOP-14214? Thanks,
            githubbot ASF GitHub Bot added a comment -

            Github user StephanEwen commented on the issue:

            https://github.com/apache/flink/pull/2252

            I think it is yes. We worked around it in the meantime...

            githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2252 I think it is yes. We worked around it in the meantime...

            People

              sewen Stephan Ewen
              rmetzger Robert Metzger
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: