Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-10600

Filesystems not properly registered using ... on FlinkRunner

Details

    Description

      This seems to be very similar to this previously closed issue: https://issues.apache.org/jira/browse/BEAM-8303

      Based on the timing of when I'm most frequently getting the error it appears to be related to the read side vs the write side of the previous issue.

      Caused by: java.lang.IllegalArgumentException: No filesystem found for scheme s3
       at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:463)
       at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:533)
       at org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1165)
       at org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1121)
       at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
       at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:604)
       at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:595)
       at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:541)
       at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:112)
       at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
       at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
       at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
       at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
       at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
       at org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:107)
       at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:504)
       at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369)
       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
       at java.lang.Thread.run(Thread.java:748)

      For reference, my read code resembles this (though I will admit, there's additional complexity that could be related):

      pipeline.apply(Create.of("s3://bucket/prefix1", "s3://bucket/prefix2"))
                .apply(FileIO.match())
                .apply(FileIO.readMatches())
      

      Looking at the PR (https://github.com/apache/beam/pull/9688) associated with the last issue, it seems as though the fix was to initialize the Filesystems across a number of different TransformTranslators. Is it possible that PR did not cover all use cases or a new case has been introduced?

      Attachments

        Activity

          People

            Unassigned Unassigned
            pgeorgantas Peter Georgantas
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: