Details
-
Bug
-
Status: Triage Needed
-
P1
-
Resolution: Fixed
-
2.15.0
Description
I’m getting the following error when attempting to use the FileIO apis (beam-2.15.0) and integrating with AWS S3. I have setup the PipelineOptions with all the relevant AWS options, so the filesystem registry *should* be properly seeded by the time the graph is compiled and executed:
java.lang.IllegalArgumentException: No filesystem found for scheme s3 at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456) at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526) at org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149) at org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105) at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159) at org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:83) at org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:32) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480) at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93) at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106) at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72) at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47) at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73) at org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:107) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:748)
For reference, the write code resembles this:
FileIO.Write<?, GenericRecord> write = FileIO.<GenericRecord>write() .via(ParquetIO.sink(schema)) .to(options.getOutputDir()). // will be something like: s3://<bucket>/<path> .withSuffix(".parquet"); records.apply(String.format("Write(%s)", options.getOutputDir()), write);
The issue does not appear to be related to ParquetIO.sink(). I am able to reliably reproduce the issue using JSON formatted records and TextIO.sink(), as well. Moreover, AvroIO is affected if withWindowedWrites() option is added.
Just trying some different knobs, I went ahead and set the following option:
write = write.withNoSpilling();
This actually seemed to fix the issue, only to have it reemerge as I scaled up the data set size. The stack trace, while very similar, reads:
java.lang.IllegalArgumentException: No filesystem found for scheme s3 at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456) at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526) at org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149) at org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105) at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159) at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82) at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480) at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93) at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106) at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72) at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47) at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73) at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:94) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:748)
And lastly, I tried adding the following deprecated option (with and without the withNoSpilling() option):
write = write.withIgnoreWindowing();
This seemed to fix the issue altogether but aside from having to rely on a deprecated feature, there is the bigger issue of why?
In reading through some of the source, it seems a common pattern to have to manually register the pipeline options to seed the filesystem registry during the setup part of the operator lifecycle, e.g.: https://github.com/apache/beam/blob/release-2.15.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L304-L313
Is it possible that I have hit upon a couple scenarios where that has not taken place?
Attachments
Issue Links
- is superceded by
-
BEAM-8577 FileSystems may have not be initialized during ResourceId deserialization
- Triage Needed
- links to