Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-26470

[Java][TypeExtractor] Missing type information in POJO types of some types (List, Map, UUID)

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Minor
    • Resolution: Unresolved
    • 1.13.2
    • None
    • API / Core

    Description

      Problem:

      Basic collections (List, Map) and custom types are not compatible with flink pojo serialization.

      Here the exception:

       

      Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type.
      java.lang.UnsupportedOperationException: Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type.
          at org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:87)
          at org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:346)
      [... nothing interesting ...]

       

      Explanation:

      Like docs said, we should not use kryo in production since it's not performant at all.

      To stop using kryo, and use the native pojos serialization, we do this:

      env.getConfig().disableGenericTypes()

       

      But pojos have to meet some requirements.

      Regarding the following code coming from flink-core v1.13.2 (and looks the same in v1.14.4):

      private <OUT, IN1, IN2> TypeInformation<OUT> privateGetForClass(
              Class<OUT> clazz,
              List<Type> typeHierarchy,
              ParameterizedType parameterizedType,
              TypeInformation<IN1> in1Type,
              TypeInformation<IN2> in2Type) {
          checkNotNull(clazz);
      
          // check if type information can be produced using a factory
          final TypeInformation<OUT> typeFromFactory =
                  createTypeInfoFromFactory(clazz, typeHierarchy, in1Type, in2Type);
          if (typeFromFactory != null) {
              return typeFromFactory;
          }
      
          // Object is handled as generic type info
          if (clazz.equals(Object.class)) {
              return new GenericTypeInfo<>(clazz);
          }
      
          // Class is handled as generic type info
          if (clazz.equals(Class.class)) {
              return new GenericTypeInfo<>(clazz);
          }
      
          // recursive types are handled as generic type info
          if (countTypeInHierarchy(typeHierarchy, clazz) > 1) {
              return new GenericTypeInfo<>(clazz);
          }
      
          // check for arrays
          if (clazz.isArray()) {
      
              // primitive arrays: int[], byte[], ...
              PrimitiveArrayTypeInfo<OUT> primitiveArrayInfo =
                      PrimitiveArrayTypeInfo.getInfoFor(clazz);
              if (primitiveArrayInfo != null) {
                  return primitiveArrayInfo;
              }
      
              // basic type arrays: String[], Integer[], Double[]
              BasicArrayTypeInfo<OUT, ?> basicArrayInfo = BasicArrayTypeInfo.getInfoFor(clazz);
              if (basicArrayInfo != null) {
                  return basicArrayInfo;
              }
      
              // object arrays
              else {
                  TypeInformation<?> componentTypeInfo =
                          createTypeInfoWithTypeHierarchy(
                                  typeHierarchy, clazz.getComponentType(), in1Type, in2Type);
      
                  return ObjectArrayTypeInfo.getInfoFor(clazz, componentTypeInfo);
              }
          }
      
          // check for writable types
          if (isHadoopWritable(clazz)) {
              return createHadoopWritableTypeInfo(clazz);
          }
      
          // check for basic types
          TypeInformation<OUT> basicTypeInfo = BasicTypeInfo.getInfoFor(clazz);
          if (basicTypeInfo != null) {
              return basicTypeInfo;
          }
      
          // check for SQL time types
          TypeInformation<OUT> timeTypeInfo = SqlTimeTypeInfo.getInfoFor(clazz);
          if (timeTypeInfo != null) {
              return timeTypeInfo;
          }
      
          // check for subclasses of Value
          if (Value.class.isAssignableFrom(clazz)) {
              Class<? extends Value> valueClass = clazz.asSubclass(Value.class);
              return (TypeInformation<OUT>) ValueTypeInfo.getValueTypeInfo(valueClass);
          }
      
          // check for subclasses of Tuple
          if (Tuple.class.isAssignableFrom(clazz)) {
              if (clazz == Tuple0.class) {
                  return new TupleTypeInfo(Tuple0.class);
              }
              throw new InvalidTypesException(
                      "Type information extraction for tuples (except Tuple0) cannot be done based on the class.");
          }
      
          // check for Enums
          if (Enum.class.isAssignableFrom(clazz)) {
              return new EnumTypeInfo(clazz);
          }
      
          // special case for POJOs generated by Avro.
          if (hasSuperclass(clazz, AVRO_SPECIFIC_RECORD_BASE_CLASS)) {
              return AvroUtils.getAvroUtils().createAvroTypeInfo(clazz);
          }
      
          if (Modifier.isInterface(clazz.getModifiers())) {
              // Interface has no members and is therefore not handled as POJO
              return new GenericTypeInfo<>(clazz);
          }
      
          try {
              Type t = parameterizedType != null ? parameterizedType : clazz;
              TypeInformation<OUT> pojoType =
                      analyzePojo(t, new ArrayList<>(typeHierarchy), in1Type, in2Type);
              if (pojoType != null) {
                  return pojoType;
              }
          } catch (InvalidTypesException e) {
              if (LOG.isDebugEnabled()) {
                  LOG.debug(
                          "Unable to handle type " + clazz + " as POJO. Message: " + e.getMessage(),
                          e);
              }
              // ignore and create generic type info
          }
      
          // return a generic type
          return new GenericTypeInfo<>(clazz);
      } 

       

      Only following types are compatible (e.g. not treated as GenericType):

      • All custom pojos with annotation @TypeInfo
      • arrays
      • All hadoop writable types
      • basic types (string, bigint/bigdecimal, instant/date, boxed primitives)
      • primitive types
      • sql time types
      • All implementing org.apache.flink.types.Value
      • All implementing org.apache.flink.api.java.tuple.Tuple
      • enums
      • Avro types
      • nested pojo

      But not:

      • List, Map, since they are falling into `Modifier.isInterface(clazz.getModifiers())`
      • UUID, since it is treated as generic pojo (no getter/setter on all fields)

       

      By the way, we can't register our custom serializer, that can really be the perfect world (@TypeInfo documentation says that there is TypeExtractor#registerFactory(Type, Class).. But there isn't)

       

       

      How to fix it ?

      There is already existing ListTypeInfo and MapTypeInfo, that can be simply used by the  method TypeExtractor.privateGetForClass(...).

      For UUID, we can create a customisable TypeInformationFactory register, that can contains all specific stuff that is not fitting the native flink libs. The other way is to add it as a BasicType.

       

      I can help to contribute !

       

      Thanks !

      Attachments

        Activity

          People

            Unassigned Unassigned
            antoine.michaud Antoine Michaud
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: