Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-26595

Improve the PostgresDialect method for getting upsert statements.

    XMLWordPrintableJSON

Details

    Description

      I'm trying to use Flink CDC to synchronize mysql data to matrixDB in real time.
      But I encountered an error.
      The error message is as follows:

      CIRCULAR REFERENCE:java.io.IOException: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO user_1(id, name, address, phone_number, email) VALUES ('110'::numeric, 'user_110', 'Shanghai', '123567891234', 'user_110@foo.com') ON CONFLICT (id) DO UPDATE SET id=EXCLUDED.id, name=EXCLUDED.name, address=EXCLUDED.address, phone_number=EXCLUDED.phone_number, email=EXCLUDED.email was aborted: ERROR: modification of distribution columns in OnConflictUpdate is not supported Call getNextException to see other errors in the batch.

      This exception is caused by the getUpsertStatement method of PostgresDialect.
      There is something wrong with the upsert statement.
      In the Update statement, uniqueKey-related columns should be deleted;

       

      I did the following experiment to test my modifications.
      At the same time, I recompiled and packaged flink-connector-JDBC. Using the modified flink-connector-JDBC, my program no longer reported errors.

      -- 1、Create a table for maxtrixDB
      CREATE TABLE user_1 (
        id int,
        name VARCHAR(255) NOT NULL DEFAULT 'flink',
        address VARCHAR(1024),
        phone_number VARCHAR(512),
        email VARCHAR(255),
        UNIQUE(id)
      );
      
      
      -- 2、Insert a record.
      INSERT INTO user_1(id, name, address, phone_number, email) 
      VALUES ('110'::numeric, 'user_110', 'Shanghai', '123567891234', 'user_110@foo.com') 
      ON CONFLICT (id) 
      DO UPDATE SET 
      id=EXCLUDED.id, 
      name=EXCLUDED.name, 
      address=EXCLUDED.address, 
      phone_number=EXCLUDED.phone_number, 
      email=EXCLUDED.email;
      
      -- 3、Executing the above insert statement results in the following error.
      ERROR:  modification of distribution columns in OnConflictUpdate is not supported
      
      
      -- 4、If the value is changed to the following statement, the command is executed successfully.
      INSERT INTO user_1(id, name, address, phone_number, email) 
      VALUES ('110'::numeric, 'user_110', 'Shanghai', '123567891234', 'user_110@foo.com') 
      ON CONFLICT (id) 
      DO UPDATE SET 
      name=EXCLUDED.name, 
      address=EXCLUDED.address, 
      phone_number=EXCLUDED.phone_number, 
      email=EXCLUDED.email;
      

       

       

      The PostgresDialect class handles upsert statements as follows:

      // package org.apache.flink.connector.jdbc.dialect.psql
          public Optional<String> getUpsertStatement(
                  String tableName, String[] fieldNames, String[] uniqueKeyFields) {
              String uniqueColumns =
                      Arrays.stream(uniqueKeyFields)
                              .map(this::quoteIdentifier)
                              .collect(Collectors.joining(", "));
              String updateClause =
                      Arrays.stream(fieldNames)
                              .map(f -> quoteIdentifier(f) + "=EXCLUDED." + quoteIdentifier(f))
                              .collect(Collectors.joining(", "));
              return Optional.of(
                      getInsertIntoStatement(tableName, fieldNames)
                              + " ON CONFLICT ("
                              + uniqueColumns
                              + ")"
                              + " DO UPDATE SET "
                              + updateClause);
          }
      

       

       

      To fix this problem, make the following changes to PostgresDialect:

      // package org.apache.flink.connector.jdbc.dialect.psql
          public Optional<String> getUpsertStatement(
                  String tableName, String[] fieldNames, String[] uniqueKeyFields) {
              String uniqueColumns =
                      Arrays.stream(uniqueKeyFields)
                              .map(this::quoteIdentifier)
                              .collect(Collectors.joining(", "));
              List tempList = Arrays.asList(uniqueKeyFields);
              String updateClause =
                      Arrays.stream(fieldNames)
                              .filter(f->!tempList.contains(f))
                              .map(f -> quoteIdentifier(f) + "=EXCLUDED." + quoteIdentifier(f))
                              .collect(Collectors.joining(", "));
              return Optional.of(
                      getInsertIntoStatement(tableName, fieldNames)
                              + " ON CONFLICT ("
                              + uniqueColumns
                              + ")"
                              + " DO UPDATE SET "
                              + updateClause);
          }
      

      Attachments

        1. image-20220311125613545.png
          560 kB
          wuguihu
        2. image-20220311130744606.png
          365 kB
          wuguihu
        3. image-20220311141815540.png
          45 kB
          wuguihu
        4. image-20220315001550269.png
          302 kB
          wuguihu

        Issue Links

          Activity

            People

              hapihu wuguihu
              hapihu wuguihu
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated: