Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-23122

Provide the Dynamic register converter

    XMLWordPrintableJSON

Details

    Description

      Background:

      Type conversion is the core of direct data conversion between Flink and data source. By default, Flink provides type conversion for different connectors. Different transformation logic is distributed in the specific implementation of multiple connectors. It brings a big problem to the reuse of Flink system. Secondly, due to the diversity of different types of data sources, the original transformation needs to be extended, and the original transformation does not have dynamic expansion. Finally, the core of the transformation logic needs to be reused in multiple projects, hoping to abstract the transformation logic into a unified processing. The application program directly depends on the same type transformation system, and different sub components can dynamically expand the types of transformation.

      1, ConvertServiceRegister : provide register and search function.

      public interface ConvertServiceRegister {
      
          void register(ConversionService conversionService);
      
          void register(ConversionServiceFactory conversionServiceFactory);
      
          void register(ConversionServiceSet conversionServiceSet);
      
          Collection<ConversionService> convertServices();
      
          Collection<ConversionService> convertServices(String group);
      
          Collection<ConversionServiceSet> convertServiceSets();
      
          Collection<ConversionServiceSet> convertServiceSets(String group);
      }
      

      2, ConversionService : provide the implement.

      public interface ConversionService<S, T> extends Order {
      
          Set<String> tags();
      
          boolean canConvert(TypeInformationHolder source, TypeInformationHolder target)
                  throws ConvertException;
      
          Object convert(
                  TypeInformationHolder sourceType,
                  Object source,
                  TypeInformationHolder targetType,
                  Object defaultValue,
                  boolean nullable)
                  throws ConvertException;
      }
      

      3, ConversionServiceFactory : provide the conversion service factory function.

      public interface ConversionServiceFactory<T> extends Order {
      
          Set<String> tags();
      
          ConversionService<?, T> getConversionService(T target) throws ConvertException;
      }
      

      4, ConversionServiceSet : provide group management.

      public interface ConversionServiceSet extends Loadable {
      
          Set<String> tags();
      
          Collection<ConversionService> conversionServices();
      
          boolean support(TypeInformationHolder source, TypeInformationHolder target)
                  throws ConvertException;
      
          Object convert(
                  String name,
                  TypeInformationHolder typeInformationHolder,
                  Object value,
                  TypeInformationHolder type,
                  Object defaultValue,
                  boolean nullable)
                  throws ConvertException;
      }
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            Jack-Lee lqjacklee
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated: