Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-34833

[Bug][sqlserver] SqlServer incremental source cannot support exactly-once

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • None
    • None
    • Flink CDC

    Description

        1. Search before asking
      • [X] I searched in the issues and found nothing similar.
        1. Flink version

      1.18

        1. Flink CDC version

      3.0

        1. Reason
          1. overview
            At first, we can see what SqlServer incremental source do to guarantee exactly-once sematics in parallel read。
            First, split the table into multiple chunks based on the key, each chunk as a snapshot split. These splits can be read in parallel.

      In snapshot phase, for each snapshot split between [low_key, high_key]:
      1. use SqlServerDialect#displayCurrentOffset to get LSN as low_watermark
      2. read snapshot data between [low_key, high_key] as a temporary state `state1` by JDBC connection query.
      3. use SqlServerDialect#displayCurrentOffset to get LSN as high_watermark
      4. read log between (low_watermark, high_watermark), will update temporary state `state1` and generate *final state of high_watermark*, then emit to downstream.

      Then in stream phase, we read log between [high_watermark, +∞)for this split between [low_key, high_key].

          1. problem
            However, `SqlServerDialect#displayCurrentOffset → SqlServerUtils#currentLsn → SqlServerConnection#getMaxTransactionLsn` return the max LSN of system table `cdc.lsn_time_mapping` , which is not the real latest LSN of whole database system. In this incremental source framework, only the real latest LSN of whole database system can guarantee exactly-once sematics.

      As shown in https://stackoverflow.com/questions/29477391/cdc-data-only-shows-up-after-5-minutes, this developer find that [CDC Data Only Shows up After 5 Minutes|https://stackoverflow.com/questions/29477391/cdc-data-only-shows-up-after-5-minutes) , and the reason is :
      > Because the capture process extracts change data from the transaction log, there is a built in latency between the time that a change is committed to a source table and the time that the change appears within its associated change table.

      For example, low_mark and high_watermark maybe 5 minutes lower than the Lsn of step two(read snapshot data). Then in streaming phase, the log which should be ignored will be read again.

        1. How to verify
          Modify test: com.ververica.cdc.connectors.oracle.source.OracleSourceITCase#testEnableBackfillWithDMLPostLowWaterMark, we can see three dml operations are read twicely.
          ```java
          @Test
          public void testEnableBackfillWithDMLPostLowWaterMark() throws Exception { List<String> records = testBackfillWhenWritingEvents(false, 25, USE_POST_LOWWATERMARK_HOOK]; List<String> expectedRecords = Arrays.asList( "+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]", "+I[109, user_4, Shanghai, 123567891234]", "+I[110, user_5, Shanghai, 123567891234]", "+I[111, user_6, Shanghai, 123567891234]", "+I[118, user_7, Shanghai, 123567891234]", "+I[121, user_8, Shanghai, 123567891234]", "+I[123, user_9, Shanghai, 123567891234]", "+I[1009, user_10, Shanghai, 123567891234]", "+I[1010, user_11, Shanghai, 123567891234]", "+I[1011, user_12, Shanghai, 123567891234]", "+I[1012, user_13, Shanghai, 123567891234]", "+I[1013, user_14, Shanghai, 123567891234]", "+I[1014, user_15, Shanghai, 123567891234]", "+I[1015, user_16, Shanghai, 123567891234]", "+I[1016, user_17, Shanghai, 123567891234]", "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[2000, user_21, Pittsburgh, 123567891234]", "+I[15213, user_15213, Shanghai, 123567891234]", // the operations already be applied in snapshot phase, but are read again in streaming phase "+I[15213, user_15213, Shanghai, 123567891234]", "-U[2000, user_21, Shanghai, 123567891234]", "+U[2000, user_21, Pittsburgh, 123567891234]", "-D[1019, user_20, Shanghai, 123567891234]"); assertEqualsInAnyOrder(expectedRecords, records); }

          private List<String> testBackfillWhenWritingEvents(
          boolean skipSnapshotBackfill, int fetchSize, int hookType) throws Exception {
          createAndInitialize("customer.sql");
          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          env.enableCheckpointing(200L);
          env.setParallelism(1);

      ResolvedSchema customersSchema =
      new ResolvedSchema(
      Arrays.asList(
      physical("ID", BIGINT().notNull()),
      physical("NAME", STRING()),
      physical("ADDRESS", STRING()),
      physical("PHONE_NUMBER", STRING())),
      new ArrayList<>(),
      UniqueConstraint.primaryKey("pk", Collections.singletonList("ID")));
      TestTable customerTable =
      new TestTable(ORACLE_DATABASE, ORACLE_SCHEMA, "CUSTOMERS", customersSchema);
      String tableId = customerTable.getTableId();

      OracleSourceBuilder.OracleIncrementalSource source =
      OracleSourceBuilder.OracleIncrementalSource.<RowData>builder()
      .hostname(ORACLE_CONTAINER.getHost())
      .port(ORACLE_CONTAINER.getOraclePort())
      .username(CONNECTOR_USER)
      .password(CONNECTOR_PWD)
      .databaseList(ORACLE_DATABASE)
      .schemaList(ORACLE_SCHEMA)
      .tableList("DEBEZIUM.CUSTOMERS")
      .skipSnapshotBackfill(skipSnapshotBackfill)
      .startupOptions(StartupOptions.initial())
      .deserializer(customerTable.getDeserializer())
      .build();

      // Do some database operations during hook in snapshot period.
      SnapshotPhaseHooks hooks = new SnapshotPhaseHooks();
      String[] statements =
      new String[]

      { String.format( "INSERT INTO %s VALUES (15213, 'user_15213', 'Shanghai', '123567891234')", tableId), String.format("UPDATE %s SET address='Pittsburgh' WHERE id=2000", tableId), String.format("DELETE FROM %s WHERE id=1019", tableId) }

      ;
      SnapshotPhaseHook snapshotPhaseHook =
      (sourceConfig, split) -> {
      // database update operations use TEST_USER rather than CONNECTOR_USER
      JdbcConfiguration configuration =
      JdbcConfiguration.copy(
      ((JdbcSourceConfig) sourceConfig)
      .getDbzConnectorConfig()
      .getJdbcConfig())
      .withUser("debezium")
      .withPassword("dbz")
      .build();
      try (OracleConnection oracleConnection =
      OracleConnectionUtils.createOracleConnection(configuration))

      { oracleConnection.setAutoCommit(false); oracleConnection.execute(statements); oracleConnection.commit(); }

      };

      if (hookType == USE_POST_LOWWATERMARK_HOOK)

      { hooks.setPostLowWatermarkAction(snapshotPhaseHook); }

      else if (hookType == USE_PRE_HIGHWATERMARK_HOOK)

      { hooks.setPreHighWatermarkAction(snapshotPhaseHook); }

      source.setSnapshotHooks(hooks);

      List<String> records = new ArrayList<>();
      try (CloseableIterator<RowData> iterator =
      env.fromSource(source, WatermarkStrategy.noWatermarks(), "Backfill Skipped Source")
      .executeAndCollect())

      { records = fetchRowData(iterator, fetchSize, customerTable::stringify); env.close(); }

      return records;
      }
      ```

          1. Are you willing to submit a PR?
      • [ ] I'm willing to submit a PR!

      ---------------- Imported from GitHub ----------------
      Url: https://github.com/apache/flink-cdc/issues/2853
      Created by: loserwang1024
      Labels: bug,
      Assignee: loserwang1024
      Created at: Tue Dec 12 10:44:24 CST 2023
      State: open

      Attachments

        Activity

          People

            Unassigned Unassigned
            flink-cdc-import Flink CDC Issue Import
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: