Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
1.13.1
-
None
Description
I'm trying to use Flink CDC to synchronize mysql data to matrixDB in real time.
But I encountered an error.
The error message is as follows:
CIRCULAR REFERENCE:java.io.IOException: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO user_1(id, name, address, phone_number, email) VALUES ('110'::numeric, 'user_110', 'Shanghai', '123567891234', 'user_110@foo.com') ON CONFLICT (id) DO UPDATE SET id=EXCLUDED.id, name=EXCLUDED.name, address=EXCLUDED.address, phone_number=EXCLUDED.phone_number, email=EXCLUDED.email was aborted: ERROR: modification of distribution columns in OnConflictUpdate is not supported Call getNextException to see other errors in the batch.
This exception is caused by the getUpsertStatement method of PostgresDialect.
There is something wrong with the upsert statement.
In the Update statement, uniqueKey-related columns should be deleted;
I did the following experiment to test my modifications.
At the same time, I recompiled and packaged flink-connector-JDBC. Using the modified flink-connector-JDBC, my program no longer reported errors.
-- 1、Create a table for maxtrixDB CREATE TABLE user_1 ( id int, name VARCHAR(255) NOT NULL DEFAULT 'flink', address VARCHAR(1024), phone_number VARCHAR(512), email VARCHAR(255), UNIQUE(id) ); -- 2、Insert a record. INSERT INTO user_1(id, name, address, phone_number, email) VALUES ('110'::numeric, 'user_110', 'Shanghai', '123567891234', 'user_110@foo.com') ON CONFLICT (id) DO UPDATE SET id=EXCLUDED.id, name=EXCLUDED.name, address=EXCLUDED.address, phone_number=EXCLUDED.phone_number, email=EXCLUDED.email; -- 3、Executing the above insert statement results in the following error. ERROR: modification of distribution columns in OnConflictUpdate is not supported -- 4、If the value is changed to the following statement, the command is executed successfully. INSERT INTO user_1(id, name, address, phone_number, email) VALUES ('110'::numeric, 'user_110', 'Shanghai', '123567891234', 'user_110@foo.com') ON CONFLICT (id) DO UPDATE SET name=EXCLUDED.name, address=EXCLUDED.address, phone_number=EXCLUDED.phone_number, email=EXCLUDED.email;
The PostgresDialect class handles upsert statements as follows:
// package org.apache.flink.connector.jdbc.dialect.psql public Optional<String> getUpsertStatement( String tableName, String[] fieldNames, String[] uniqueKeyFields) { String uniqueColumns = Arrays.stream(uniqueKeyFields) .map(this::quoteIdentifier) .collect(Collectors.joining(", ")); String updateClause = Arrays.stream(fieldNames) .map(f -> quoteIdentifier(f) + "=EXCLUDED." + quoteIdentifier(f)) .collect(Collectors.joining(", ")); return Optional.of( getInsertIntoStatement(tableName, fieldNames) + " ON CONFLICT (" + uniqueColumns + ")" + " DO UPDATE SET " + updateClause); }
To fix this problem, make the following changes to PostgresDialect:
// package org.apache.flink.connector.jdbc.dialect.psql public Optional<String> getUpsertStatement( String tableName, String[] fieldNames, String[] uniqueKeyFields) { String uniqueColumns = Arrays.stream(uniqueKeyFields) .map(this::quoteIdentifier) .collect(Collectors.joining(", ")); List tempList = Arrays.asList(uniqueKeyFields); String updateClause = Arrays.stream(fieldNames) .filter(f->!tempList.contains(f)) .map(f -> quoteIdentifier(f) + "=EXCLUDED." + quoteIdentifier(f)) .collect(Collectors.joining(", ")); return Optional.of( getInsertIntoStatement(tableName, fieldNames) + " ON CONFLICT (" + uniqueColumns + ")" + " DO UPDATE SET " + updateClause); }
Attachments
Attachments
Issue Links
- links to