Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-23888

Flink azure fs doesn't work

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.13.1, 1.13.2
    • None
    • FileSystems
    • None

    Description

      A working pipeline on AWS S3 doesn't work with BlobStorage.

      Flink deployed in kubernetes with HA . The following is the configuration :

      high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory

      high-availability.storageDir: wasbs://***

      state.backend: rocksdb

      state.checkpoints.dir: wasb://***

      I always got LinkageError.

      I believe the issue is that azure / hadoop classes are shaded  with "org.apache.flink" pattern and always get loaded via parent not via plugin class loader.

      Snapshot of the log :

      2021-08-18 11:56:03,557 INFO  org.apache.flink.fs.azurefs.AzureFSFactory                   [] - Trying to load and instantiate Azure File System

      2021-08-18 11:56:03,747 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Shutting StandaloneApplicationClusterEntryPoint down with application status FAILED. Diagnostics java.lang.LinkageError: loader org.apache.flink.core.plugin.PluginLoader$PluginClassLoader @61bcbcce attempted duplicate class definition for org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalFileSystem. (org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalFileSystem is in unnamed module of loader org.apache.flink.core.plugin.PluginLoader$PluginClassLoader @61bcbcce, parent loader 'platform')

              at java.base/java.lang.ClassLoader.defineClass1(Native Method)

              at java.base/java.lang.ClassLoader.defineClass(Unknown Source)

              at java.base/java.security.SecureClassLoader.defineClass(Unknown Source)

              at java.base/java.net.URLClassLoader.defineClass(Unknown Source)

              at java.base/java.net.URLClassLoader$1.run(Unknown Source)

              at java.base/java.net.URLClassLoader$1.run(Unknown Source)

              at java.base/java.security.AccessController.doPrivileged(Native Method)

              at java.base/java.net.URLClassLoader.findClass(Unknown Source)

              at java.base/java.lang.ClassLoader.loadClass(Unknown Source)

              at org.apache.flink.core.plugin.PluginLoader$PluginClassLoader.loadClass(PluginLoader.java:171)

              at java.base/java.lang.ClassLoader.loadClass(Unknown Source)

              at org.apache.flink.fs.azure.common.HadoopConfigLoader.loadHadoopConfigFromFlink(HadoopConfigLoader.java:96)

              at org.apache.flink.fs.azure.common.HadoopConfigLoader.getOrLoadHadoopConfig(HadoopConfigLoader.java:82)

              at org.apache.flink.fs.azurefs.AbstractAzureFSFactory.createInitializedAzureFS(AbstractAzureFSFactory.java:85)

              at org.apache.flink.fs.azurefs.AbstractAzureFSFactory.create(AbstractAzureFSFactory.java:79)

              at org.apache.flink.core.fs.PluginFileSystemFactory.create(PluginFileSystemFactory.java:62)

              at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:506)

              at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:407)

              at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274)

              at org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:89)

              at org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:76)

              at org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory.createHAServices(KubernetesHaServicesFactory.java:40)

              at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:265)

              at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:124)

              at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:353)

              at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:311)

              at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:239)

              at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:189)

              at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)

              at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:186)

              at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:600)

              at org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint.main(StandaloneApplicationClusterEntryPoint.java:85)

       

       

       

       

       

      Attachments

        1. pom.xml
          7 kB
          Liviu Firu

        Activity

          People

            Unassigned Unassigned
            lmfiru Liviu Firu
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: