Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-33785

TableJdbcUpsertOutputFormat could not deal with DELETE record correctly when primary keys were set

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • jdbc-3.1.1
    • None
    • Connectors / JDBC
    • None
    • Flink: 1.17.1

      Jdbc connector: 3.1.1

      Postgresql: 16.1

    Description

      Issue Description

      When using jdbc connector to DELETE records in database, I found it CAN NOT delete records correctly.

      Reproduction steps

      The steps are as follows:

      • Create a table with 5 fields and a pk. DDL in postgres:

       

      create table public.fake
      (
          id       bigint                 not null default nextval('fake_id_seq'::regclass),
          name     character varying(128) not null,
          age      integer,
          location character varying(256),
          birthday timestamp without time zone     default CURRENT_TIMESTAMP,
          primary key (id, name)
      );

       

      • Insert some data into the table:
      INSERT INTO public.fake (id, name, age, location, birthday) VALUES (1, 'Jack', 10, null, '2023-12-08 21:35:46.000000');
      INSERT INTO public.fake (id, name, age, location, birthday) VALUES (2, 'Jerry', 18, 'Fake Location', '2023-12-08 13:36:17.088295');
      INSERT INTO public.fake (id, name, age, location, birthday) VALUES (3, 'John', 20, null, null);
      INSERT INTO public.fake (id, name, age, location, birthday) VALUES (4, 'Marry', null, null, '2023-12-08 13:37:09.721785');
      

      • Run the flink code:
      public static void main(String[] args) throws Exception {
          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      
          final String[] fieldNames = {"id", "name", "age", "location", "birthday"};
          final int[] fieldTypes = {
                  Types.BIGINT, Types.VARCHAR, Types.INTEGER, Types.VARCHAR, Types.TIMESTAMP
          };
          final String[] primaryKeys = {"id", "name"};
          InternalJdbcConnectionOptions internalJdbcConnectionOptions =
                  InternalJdbcConnectionOptions.builder()
                          .setClassLoader(Thread.currentThread().getContextClassLoader())
                          .setDriverName(Driver.class.getName())
                          .setDBUrl("jdbc:postgresql://localhost:5432/postgres")
                          .setUsername("postgres")
                          .setPassword("postgres")
                          .setTableName("fake")
                          .setParallelism(1)
                          .setConnectionCheckTimeoutSeconds(10)
                          .setDialect(new PostgresDialect())
                          .build();
          JdbcOutputFormat<Tuple2<Boolean, Row>, Row, JdbcBatchStatementExecutor<Row>> jdbcOutputFormat =
                  JdbcOutputFormat.builder()
                          .setFieldNames(fieldNames)
                          .setKeyFields(primaryKeys)
                          .setFieldTypes(fieldTypes)
                          .setOptions(internalJdbcConnectionOptions)
                          .setFlushIntervalMills(1000)
                          .setFlushMaxSize(10)
                          .setMaxRetryTimes(3)
                          .build();
      
          GenericJdbcSinkFunction<Tuple2<Boolean, Row>> jdbcSinkFunction =
                  new GenericJdbcSinkFunction<>(jdbcOutputFormat);
      
          Timestamp timestamp = Timestamp.valueOf("2023-12-08 21:35:46.000000");
          // Row to delete
          Row row = Row.ofKind(RowKind.DELETE, 1L, "Jack", 10, null, timestamp);
          Tuple2<Boolean, Row> element = Tuple2.of(false, row);
          env.fromCollection(Collections.singleton(element)).addSink(jdbcSinkFunction);
          env.execute();
      } 

      When the code executed successfully, we can see that the record id=1 and name=Jack was not deleted.

      Cause Analysis

      In the build method of JdbcOutputFormat.Builder, if 'keyFields' option was set in the JdbcDmlOptions, the method will return a 'org.apache.flink.connector.jdbc.internal.TableJdbcUpsertOutputFormat'.

      And in 'org.apache.flink.connector.jdbc.internal.TableJdbcUpsertOutputFormat#createDeleteExecutor', the method get all the fieldNames instead of keyFields to build the delete sql statement. So the detele sql may not execute correctly.

      How to fix

      • Use the real keyFields then fallback to fieldNames to build the executor.

      Attachments

        1. image-2023-12-08-22-42-06-566.png
          283 kB
          Bodong Liu
        2. image-2023-12-08-22-40-35-530.png
          148 kB
          Bodong Liu
        3. image-2023-12-08-22-38-08-559.png
          111 kB
          Bodong Liu
        4. image-2023-12-08-22-28-44-948.png
          159 kB
          Bodong Liu
        5. image-2023-12-08-22-24-58-986.png
          24 kB
          Bodong Liu
        6. image-2023-12-08-22-24-26-493.png
          37 kB
          Bodong Liu
        7. image-2023-12-08-22-24-20-295.png
          37 kB
          Bodong Liu

        Activity

          People

            Unassigned Unassigned
            liubodong Bodong Liu
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: