Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-30504

Fix UnsupportedFileSystemSchemeException when writing Table Store on OSS with other engines

    XMLWordPrintableJSON

Details

    Description

      Currently when writing Table Store tables on OSS with other engines (for example Spark), the following exception will occur.

      22/12/23 17:54:12 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 3) (core-1-1.c-c9f1b761c8946269.cn-huhehaote.emr.aliyuncs.com executor 2): java.lang.RuntimeException: Failed to find latest snapshot id
        at org.apache.flink.table.store.file.utils.SnapshotManager.latestSnapshotId(SnapshotManager.java:81)
        at org.apache.flink.table.store.file.operation.AbstractFileStoreWrite.scanExistingFileMetas(AbstractFileStoreWrite.java:87)
        at org.apache.flink.table.store.file.operation.KeyValueFileStoreWrite.createWriter(KeyValueFileStoreWrite.java:113)
        at org.apache.flink.table.store.file.operation.AbstractFileStoreWrite.createWriter(AbstractFileStoreWrite.java:227)
        at org.apache.flink.table.store.file.operation.AbstractFileStoreWrite.lambda$getWriter$1(AbstractFileStoreWrite.java:217)
        at java.util.HashMap.computeIfAbsent(HashMap.java:1128)
        at org.apache.flink.table.store.file.operation.AbstractFileStoreWrite.getWriter(AbstractFileStoreWrite.java:217)
        at org.apache.flink.table.store.file.operation.AbstractFileStoreWrite.write(AbstractFileStoreWrite.java:106)
        at org.apache.flink.table.store.table.sink.TableWriteImpl.write(TableWriteImpl.java:63)
        at org.apache.flink.table.store.spark.SparkWrite$WriteRecords.call(SparkWrite.java:124)
        at org.apache.flink.table.store.spark.SparkWrite$WriteRecords.call(SparkWrite.java:105)
        at org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1070)
        at org.apache.spark.rdd.PairRDDFunctions.$anonfun$mapValues$3(PairRDDFunctions.scala:752)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at scala.collection.TraversableOnce.reduceLeft(TraversableOnce.scala:237)
        at scala.collection.TraversableOnce.reduceLeft$(TraversableOnce.scala:220)
        at scala.collection.AbstractIterator.reduceLeft(Iterator.scala:1431)
        at org.apache.spark.rdd.RDD.$anonfun$reduce$2(RDD.scala:1097)
        at org.apache.spark.SparkContext.$anonfun$runJob$6(SparkContext.scala:2322)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:136)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
      Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'oss'. The scheme is directly supported by Flink through the following plugin(s): flink-oss-fs-hadoop. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/ for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/.
        at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:515)
        at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409)
        at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274)
        at org.apache.flink.table.store.file.utils.SnapshotManager.findLatest(SnapshotManager.java:164)
        at org.apache.flink.table.store.file.utils.SnapshotManager.latestSnapshotId(SnapshotManager.java:79)
        ... 30 more
      

      Attachments

        Issue Links

          Activity

            People

              TsReaper Caizhi Weng
              TsReaper Caizhi Weng
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: