Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-14574

flink-s3-fs-hadoop doesn't work with plugins mechanism

    XMLWordPrintableJSON

    Details

      Description

      As reported by a user via mailing list:

      We've added flink-s3-fs-hadoop library to plugins folder and trying to
      bootstrap state to S3 using S3A protocol. The following exception happens
      (unless hadoop library is put to lib folder instead of plugins). Looks like
      S3A filesystem is trying to use "local" filesystem for temporary files and
      fails:
      
      java.lang.Exception: Could not write timer service of MapPartition
      (d2976134f80849779b7a94b7e6218476) (4/4) to checkpoint state stream.
      	at
      org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:466)
      	at
      org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89)
      	at
      org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399)
      	at
      org.apache.flink.state.api.output.SnapshotUtils.snapshot(SnapshotUtils.java:59)
      	at
      org.apache.flink.state.api.output.operators.KeyedStateBootstrapOperator.endInput(KeyedStateBootstrapOperator.java:84)
      	at
      org.apache.flink.state.api.output.BoundedStreamTask.performDefaultAction(BoundedStreamTask.java:85)
      	at
      org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
      	at
      org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
      	at
      org.apache.flink.state.api.output.BoundedOneInputStreamTaskRunner.mapPartition(BoundedOneInputStreamTaskRunner.java:76)
      	at
      org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103)
      	at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:504)
      	at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369)
      	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
      	at java.lang.Thread.run(Thread.java:748)
      Caused by: java.io.IOException: Could not open output stream for state
      backend
      	at
      org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:367)
      	at
      org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:234)
      	at
      org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:209)
      	at
      org.apache.flink.runtime.state.NonClosingCheckpointOutputStream.write(NonClosingCheckpointOutputStream.java:61)
      	at java.io.DataOutputStream.write(DataOutputStream.java:107)
      	at java.io.DataOutputStream.writeUTF(DataOutputStream.java:401)
      	at java.io.DataOutputStream.writeUTF(DataOutputStream.java:323)
      	at
      org.apache.flink.util.LinkedOptionalMapSerializer.lambda$writeOptionalMap$0(LinkedOptionalMapSerializer.java:58)
      	at
      org.apache.flink.util.LinkedOptionalMap.forEach(LinkedOptionalMap.java:163)
      	at
      org.apache.flink.util.LinkedOptionalMapSerializer.writeOptionalMap(LinkedOptionalMapSerializer.java:57)
      	at
      org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshotData.writeKryoRegistrations(KryoSerializerSnapshotData.java:141)
      	at
      org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshotData.writeSnapshotData(KryoSerializerSnapshotData.java:128)
      	at
      org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshot.writeSnapshot(KryoSerializerSnapshot.java:72)
      	at
      org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.writeVersionedSnapshot(TypeSerializerSnapshot.java:153)
      	at
      org.apache.flink.streaming.api.operators.InternalTimersSnapshotReaderWriters$InternalTimersSnapshotWriterV2.writeKeyAndNamespaceSerializers(InternalTimersSnapshotReaderWriters.java:199)
      	at
      org.apache.flink.streaming.api.operators.InternalTimersSnapshotReaderWriters$AbstractInternalTimersSnapshotWriter.writeTimersSnapshot(InternalTimersSnapshotReaderWriters.java:117)
      	at
      org.apache.flink.streaming.api.operators.InternalTimerServiceSerializationProxy.write(InternalTimerServiceSerializationProxy.java:101)
      	at
      org.apache.flink.streaming.api.operators.InternalTimeServiceManager.snapshotStateForKeyGroup(InternalTimeServiceManager.java:139)
      	at
      org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:462)
      	... 14 common frames omitted
      Caused by:
      org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.UnsupportedFileSystemException:
      No FileSystem for scheme "file"
      	at
      org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3332)
      	at
      org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3352)
      	at
      org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
      	at
      org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3403)
      	at
      org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3371)
      	at
      org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.get(FileSystem.java:477)
      	at
      org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.getLocal(FileSystem.java:433)
      	at
      org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:301)
      	at
      org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:378)
      	at
      org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:456)
      	at
      org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:200)
      	at
      org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.createTmpFileForWrite(S3AFileSystem.java:572)
      	at
      org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3ADataBlocks$DiskBlockFactory.create(S3ADataBlocks.java:811)
      	at
      org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3ABlockOutputStream.createBlockIfNeeded(S3ABlockOutputStream.java:190)
      	at
      org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3ABlockOutputStream.<init>(S3ABlockOutputStream.java:168)
      	at
      org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:778)
      	at
      org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1169)
      	at
      org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1149)
      	at
      org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1038)
      	at
      org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.create(HadoopFileSystem.java:141)
      	at
      org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.create(HadoopFileSystem.java:37)
      	at
      org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:126)
      	at
      org.apache.flink.core.fs.EntropyInjector.createEntropyAware(EntropyInjector.java:61)
      	at
      org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:356)
      	... 32 common frames omitted
      

      I think the problem is caused by org.apache.hadoop.fs.FileSystem#loadFileSystems method inside flink-s3-fs-hadoop, which is using ServiceLoader.load(FileSystem.class); to load a FileSystem via Thread.currentThread().getContextClassLoader();. At this point of time getContextClassLoader() is probably already the user class loader instead of plugin's.

      We should investigate why is this loadFileSystems method called so late (after we have already restored user's class loader) and how can we workaround this.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                AHeise Arvid Heise
                Reporter:
                pnowojski Piotr Nowojski
              • Votes:
                0 Vote for this issue
                Watchers:
                6 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved:

                  Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 0.5h
                  0.5h