Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
jdbc-3.1.1
-
None
-
None
-
Flink: 1.17.1
Jdbc connector: 3.1.1
Postgresql: 16.1
Description
Issue Description
When using jdbc connector to DELETE records in database, I found it CAN NOT delete records correctly.
Reproduction steps
The steps are as follows:
- Create a table with 5 fields and a pk. DDL in postgres:
create table public.fake ( id bigint not null default nextval('fake_id_seq'::regclass), name character varying(128) not null, age integer, location character varying(256), birthday timestamp without time zone default CURRENT_TIMESTAMP, primary key (id, name) );
- Insert some data into the table:
INSERT INTO public.fake (id, name, age, location, birthday) VALUES (1, 'Jack', 10, null, '2023-12-08 21:35:46.000000'); INSERT INTO public.fake (id, name, age, location, birthday) VALUES (2, 'Jerry', 18, 'Fake Location', '2023-12-08 13:36:17.088295'); INSERT INTO public.fake (id, name, age, location, birthday) VALUES (3, 'John', 20, null, null); INSERT INTO public.fake (id, name, age, location, birthday) VALUES (4, 'Marry', null, null, '2023-12-08 13:37:09.721785');
- Run the flink code:
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final String[] fieldNames = {"id", "name", "age", "location", "birthday"}; final int[] fieldTypes = { Types.BIGINT, Types.VARCHAR, Types.INTEGER, Types.VARCHAR, Types.TIMESTAMP }; final String[] primaryKeys = {"id", "name"}; InternalJdbcConnectionOptions internalJdbcConnectionOptions = InternalJdbcConnectionOptions.builder() .setClassLoader(Thread.currentThread().getContextClassLoader()) .setDriverName(Driver.class.getName()) .setDBUrl("jdbc:postgresql://localhost:5432/postgres") .setUsername("postgres") .setPassword("postgres") .setTableName("fake") .setParallelism(1) .setConnectionCheckTimeoutSeconds(10) .setDialect(new PostgresDialect()) .build(); JdbcOutputFormat<Tuple2<Boolean, Row>, Row, JdbcBatchStatementExecutor<Row>> jdbcOutputFormat = JdbcOutputFormat.builder() .setFieldNames(fieldNames) .setKeyFields(primaryKeys) .setFieldTypes(fieldTypes) .setOptions(internalJdbcConnectionOptions) .setFlushIntervalMills(1000) .setFlushMaxSize(10) .setMaxRetryTimes(3) .build(); GenericJdbcSinkFunction<Tuple2<Boolean, Row>> jdbcSinkFunction = new GenericJdbcSinkFunction<>(jdbcOutputFormat); Timestamp timestamp = Timestamp.valueOf("2023-12-08 21:35:46.000000"); // Row to delete Row row = Row.ofKind(RowKind.DELETE, 1L, "Jack", 10, null, timestamp); Tuple2<Boolean, Row> element = Tuple2.of(false, row); env.fromCollection(Collections.singleton(element)).addSink(jdbcSinkFunction); env.execute(); }
When the code executed successfully, we can see that the record id=1 and name=Jack was not deleted.
Cause Analysis
In the build method of JdbcOutputFormat.Builder, if 'keyFields' option was set in the JdbcDmlOptions, the method will return a 'org.apache.flink.connector.jdbc.internal.TableJdbcUpsertOutputFormat'.
And in 'org.apache.flink.connector.jdbc.internal.TableJdbcUpsertOutputFormat#createDeleteExecutor', the method get all the fieldNames instead of keyFields to build the delete sql statement. So the detele sql may not execute correctly.
How to fix
- Use the real keyFields then fallback to fieldNames to build the executor.