Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
None
-
None
Description
-
- Search before asking
- [X] I searched in the issues and found nothing similar.
-
- Flink version
1.18
-
- Flink CDC version
3.0
-
- Reason
- overview
At first, we can see what SqlServer incremental source do to guarantee exactly-once sematics in parallel read。
First, split the table into multiple chunks based on the key, each chunk as a snapshot split. These splits can be read in parallel.
- overview
- Reason
In snapshot phase, for each snapshot split between [low_key, high_key]:
1. use SqlServerDialect#displayCurrentOffset to get LSN as low_watermark
2. read snapshot data between [low_key, high_key] as a temporary state `state1` by JDBC connection query.
3. use SqlServerDialect#displayCurrentOffset to get LSN as high_watermark
4. read log between (low_watermark, high_watermark), will update temporary state `state1` and generate *final state of high_watermark*, then emit to downstream.
Then in stream phase, we read log between [high_watermark, +∞)for this split between [low_key, high_key].
-
-
- problem
However, `SqlServerDialect#displayCurrentOffset → SqlServerUtils#currentLsn → SqlServerConnection#getMaxTransactionLsn` return the max LSN of system table `cdc.lsn_time_mapping` , which is not the real latest LSN of whole database system. In this incremental source framework, only the real latest LSN of whole database system can guarantee exactly-once sematics.
- problem
-
As shown in https://stackoverflow.com/questions/29477391/cdc-data-only-shows-up-after-5-minutes, this developer find that [CDC Data Only Shows up After 5 Minutes|https://stackoverflow.com/questions/29477391/cdc-data-only-shows-up-after-5-minutes) , and the reason is :
> Because the capture process extracts change data from the transaction log, there is a built in latency between the time that a change is committed to a source table and the time that the change appears within its associated change table.
For example, low_mark and high_watermark maybe 5 minutes lower than the Lsn of step two(read snapshot data). Then in streaming phase, the log which should be ignored will be read again.
-
- How to verify
Modify test: com.ververica.cdc.connectors.oracle.source.OracleSourceITCase#testEnableBackfillWithDMLPostLowWaterMark, we can see three dml operations are read twicely.
```java
@Test
public void testEnableBackfillWithDMLPostLowWaterMark() throws Exception { List<String> records = testBackfillWhenWritingEvents(false, 25, USE_POST_LOWWATERMARK_HOOK]; List<String> expectedRecords = Arrays.asList( "+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]", "+I[109, user_4, Shanghai, 123567891234]", "+I[110, user_5, Shanghai, 123567891234]", "+I[111, user_6, Shanghai, 123567891234]", "+I[118, user_7, Shanghai, 123567891234]", "+I[121, user_8, Shanghai, 123567891234]", "+I[123, user_9, Shanghai, 123567891234]", "+I[1009, user_10, Shanghai, 123567891234]", "+I[1010, user_11, Shanghai, 123567891234]", "+I[1011, user_12, Shanghai, 123567891234]", "+I[1012, user_13, Shanghai, 123567891234]", "+I[1013, user_14, Shanghai, 123567891234]", "+I[1014, user_15, Shanghai, 123567891234]", "+I[1015, user_16, Shanghai, 123567891234]", "+I[1016, user_17, Shanghai, 123567891234]", "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[2000, user_21, Pittsburgh, 123567891234]", "+I[15213, user_15213, Shanghai, 123567891234]", // the operations already be applied in snapshot phase, but are read again in streaming phase "+I[15213, user_15213, Shanghai, 123567891234]", "-U[2000, user_21, Shanghai, 123567891234]", "+U[2000, user_21, Pittsburgh, 123567891234]", "-D[1019, user_20, Shanghai, 123567891234]"); assertEqualsInAnyOrder(expectedRecords, records); }private List<String> testBackfillWhenWritingEvents(
boolean skipSnapshotBackfill, int fetchSize, int hookType) throws Exception {
createAndInitialize("customer.sql");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(200L);
env.setParallelism(1);
- How to verify
ResolvedSchema customersSchema =
new ResolvedSchema(
Arrays.asList(
physical("ID", BIGINT().notNull()),
physical("NAME", STRING()),
physical("ADDRESS", STRING()),
physical("PHONE_NUMBER", STRING())),
new ArrayList<>(),
UniqueConstraint.primaryKey("pk", Collections.singletonList("ID")));
TestTable customerTable =
new TestTable(ORACLE_DATABASE, ORACLE_SCHEMA, "CUSTOMERS", customersSchema);
String tableId = customerTable.getTableId();
OracleSourceBuilder.OracleIncrementalSource source =
OracleSourceBuilder.OracleIncrementalSource.<RowData>builder()
.hostname(ORACLE_CONTAINER.getHost())
.port(ORACLE_CONTAINER.getOraclePort())
.username(CONNECTOR_USER)
.password(CONNECTOR_PWD)
.databaseList(ORACLE_DATABASE)
.schemaList(ORACLE_SCHEMA)
.tableList("DEBEZIUM.CUSTOMERS")
.skipSnapshotBackfill(skipSnapshotBackfill)
.startupOptions(StartupOptions.initial())
.deserializer(customerTable.getDeserializer())
.build();
// Do some database operations during hook in snapshot period.
SnapshotPhaseHooks hooks = new SnapshotPhaseHooks();
String[] statements =
new String[]
;
SnapshotPhaseHook snapshotPhaseHook =
(sourceConfig, split) -> {
// database update operations use TEST_USER rather than CONNECTOR_USER
JdbcConfiguration configuration =
JdbcConfiguration.copy(
((JdbcSourceConfig) sourceConfig)
.getDbzConnectorConfig()
.getJdbcConfig())
.withUser("debezium")
.withPassword("dbz")
.build();
try (OracleConnection oracleConnection =
OracleConnectionUtils.createOracleConnection(configuration))
};
if (hookType == USE_POST_LOWWATERMARK_HOOK)
{ hooks.setPostLowWatermarkAction(snapshotPhaseHook); }else if (hookType == USE_PRE_HIGHWATERMARK_HOOK)
{ hooks.setPreHighWatermarkAction(snapshotPhaseHook); }source.setSnapshotHooks(hooks);
List<String> records = new ArrayList<>();
try (CloseableIterator<RowData> iterator =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Backfill Skipped Source")
.executeAndCollect())
return records;
}
```
-
-
- Are you willing to submit a PR?
-
- [ ] I'm willing to submit a PR!
---------------- Imported from GitHub ----------------
Url: https://github.com/apache/flink-cdc/issues/2853
Created by: loserwang1024
Labels: bug,
Assignee: loserwang1024
Created at: Tue Dec 12 10:44:24 CST 2023
State: open