Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-12725

Need to copy flink-hadoop-compatibility jar explicitly to ${FLINK-HOME}/lib location

    XMLWordPrintableJSON

Details

    Description

      I am currently working on an Flink application that uses some of the Hadoop dependencies to write the data to HDFS.  On local environment it is working fine, however when I deploy this Flink application on the cluster it throws an exception related to compatibility issue.
      The error message that I am getting is 
       

      java.lang.RuntimeException: Could not load the TypeInformation for the class 'org.apache.hadoop.io.Writable'. You may be missing the 'flink-hadoop-compatibility' dependency. at org.apache.flink.api.java.typeutils.TypeExtractor.createHadoopWritableTypeInfo(TypeExtractor.java:2025) at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1649) at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1591) at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:778) ....
      

      I try to included the maven dependency of flink-hadoop-compatibility jar in POM dependency. But it is not detecting it. The Flink version I am using is 1.8.0

      However, when I explicitly copy the compatibility JAR to the ${FLINK-HOME}/lib location, I am not getting any exception and able to run the Flink application successfully.

      I try dive into the source code, and find the problem:

      package org.apache.flink.api.java.typeutils;
      
      public class TypeExtractor {
      
      /** The name of the class representing Hadoop's writable */
      private static final String HADOOP_WRITABLE_CLASS = "org.apache.hadoop.io.Writable";
      private static final String HADOOP_WRITABLE_TYPEINFO_CLASS = "org.apache.flink.api.java.typeutils.WritableTypeInfo";
      
      
      // visible for testing
      public static <T> TypeInformation<T> createHadoopWritableTypeInfo(Class<T> clazz) {
      checkNotNull(clazz);
      
      Class<?> typeInfoClass;
      try {
      typeInfoClass = Class.forName(HADOOP_WRITABLE_TYPEINFO_CLASS, false, TypeExtractor.class.getClassLoader());
      }
      catch (ClassNotFoundException e) {
      throw new RuntimeException("Could not load the TypeInformation for the class '"
      + HADOOP_WRITABLE_CLASS + "'. You may be missing the 'flink-hadoop-compatibility' dependency.");
      }
      
      ...
      }
      }
      

       

      This is because `org.apache.hadoop.io.Writable` is mean to be loaded by TypeExtractor.class.getClassLoader() which is `AppClassLoader`, and the submited flink jar is loaded by `ParentFirstClassLoader`, which is the child of `AppClassLoader`, so `AppClassLoader` can not load `org.apache.hadoop.io.Writable` from your flink jar.

      I'm not sure if it's a bug, change to classLoader to `Thread.currentThread().getContextClassLoader()` will make it work without copy the flink-hadoop-compatibility jar file to ${FLINK-HOME}/lib location.

      Attachments

        Issue Links

          Activity

            People

              arganzheng arganzheng
              arganzheng arganzheng
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 20m
                  20m