Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-35022

Add TypeInformed Element Converter for DynamoDbSink

    XMLWordPrintableJSON

Details

    Description

      Context

      DynamoDbSink as an extentsion of AsyncSinkBase depends on org.apache.flink.connector.base.sink.writer.ElementConverter to convert Flink stream objects to DynamoDb write requests, where item is represented as Map<String, AttributeValue[1]>.

      AttributeValue is the wrapper for the DynamoDb comprehendable Object in a format similar with type identification properties as in
      {M": {"Name" :

      {"S": Joe }

      , "Age" :

      {"N": 35 }

      }}.

      Since TypeInformation is already natively supported in Flink, many implementations of the DynamoDb ElementConverted is just a boiler plate.
      For example

      "Simple POJO Element Conversion"
       public class Order {
              String id;
              int quantity;
              double total;
      }
      

      The implementation of the converter must be

      "Simple POJO DDB Element Converter"
      public static class SimplePojoElementConverter implements ElementConverter<Order, DynamoDbWriteRequest> {
      
              @Override
              public DynamoDbWriteRequest apply(Order order, SinkWriter.Context context) {
                  Map<String, AttributeValue> itemMap = new HashMap<>();
                  itemMap.put("id", AttributeValue.builder().s(order.id).build());
                  itemMap.put("quantity", AttributeValue.builder().n(String.valueOf(order.quantity)).build());
                  itemMap.put("total", AttributeValue.builder().n(String.valueOf(order.total)).build());
                  return DynamoDbWriteRequest.builder()
                          .setType(DynamoDbWriteRequestType.PUT)
                          .setItem(itemMap)
                          .build();
              }
      
              @Override
              public void open(Sink.InitContext context) {
                  
              }
          }
      

      while this might not be too much of work, however it is a fairly common case in Flink and this implementation requires some fair knowledge of DDB model for new users.

      Proposal

      Introduce {{ DynamoDbTypeInformedElementConverter}} as follows:

      "TypeInformedElementconverter"
       
      public class DynamoDbTypeInformedElementConverter<inputT> implements ElementConverter<inputT, DynamoDbWriteRequest> {
      DynamoDbTypeInformedElementConverter(CompositeType<inputT> typeInfo);
          public DynamoDbWriteRequest convertElement(input) {
          switch this.typeInfo{
              case: BasicTypeInfo.STRING_TYPE_INFO: return input -> AttributeValue.fromS(o.toString())
              case: BasicTypeInfo.SHORT_TYPE_INFO: 
              case: BasicTypeInfo.INTEGER_TYPE_INFO: input -> AttributeValue.fromN(o.toString())
             case: TupleTypeInfo: input -> AttributeValue.fromL(converTuple(input))
            .....
          }
      }
      }
      
      // User Code
      public static void main(String []args) {
        DynamoDbTypeInformedElementConverter elementConverter = new DynamoDbTypeInformedElementConverter(TypeInformation.of(Order.class));
      DdbSink.setElementConverter(elementConverter); 
      }
      
      

      We will start by supporting all Pojo/ basic/ Tuple/ Array typeInfo which should be enough to cover all DDB supported types (s,n,bool,b,ss,ns,bs,bools,m,l)

      1- https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/dynamodb/model/AttributeValue.html

      Attachments

        Issue Links

          Activity

            People

              chalixar Ahmed Hamdy
              chalixar Ahmed Hamdy
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated: