Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-34859

[Bug] Oracle cdc in table api does no support server-time-zone option

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • None
    • None
    • Flink CDC

    Description

          1. Search before asking
      • [X] I searched in the [issues|https://github.com/ververica/flink-cdc-connectors/issues) and found nothing similar.
          1. Flink version

      1.17.1

          1. Flink CDC version

      3.0.0

          1. Database and its version

      Oracle Database 11g Enterprise Edition Release 11.2.0.4.0 - 64bit Production With the Partitioning, OLAP, Data Mining and Real Application Testing options

          1. Minimal reproduce step
        1. Create a cdc source in table api with `server-time-zone` option specified.

      ```java
      StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
      env.setParallelism(1);

      Schema schema = Schema.newBuilder()
      .column("NAME", DataTypes.STRING())
      .column("ADDR", DataTypes.STRING())
      .build();

      String factoryIdentifier = new OracleTableSourceFactory().factoryIdentifier();
      TableDescriptor tableDescriptor = TableDescriptor.forConnector(factoryIdentifier)
      .schema(schema)
      // .format(DebeziumJsonFormatFactory.IDENTIFIER)
      .option(OracleSourceOptions.HOSTNAME, "my-oracle-host")
      .option(OracleSourceOptions.PORT, 1521)
      .option(OracleSourceOptions.USERNAME, "my-oracle-username")
      .option(OracleSourceOptions.PASSWORD, "my-oracle-password")
      .option(OracleSourceOptions.DATABASE_NAME, "my-oracle-database")
      .option(OracleSourceOptions.SCHEMA_NAME, "my-oracle-schema")
      .option(OracleSourceOptions.TABLE_NAME, "TEST")
      .option(OracleSourceOptions.SCAN_STARTUP_MODE, "initial")
      .option(OracleSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED, false)
      .option(OracleSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE, 10)
      .option(OracleSourceOptions.SERVER_TIME_ZONE, "Asia/Shanghai")
      .option("debezium.include.schema.changes", "false")
      .option("debezium.database.history.store.only.captured.tables.ddl", "true")
      .build();

      StreamTableEnvironmentImpl tEnv = (StreamTableEnvironmentImpl) StreamTableEnvironmentImpl.create(env, EnvironmentSettings.newInstance().inStreamingMode().build());
      Table table = tEnv.from(tableDescriptor);
      tEnv.toChangelogStream(table).print();
      env.execute();
      ```

        1. Exceptions are:
          ```text
          Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'anonymous_oracle-cdc$1'.

      Table options are:

      'connector'='oracle-cdc'
      'database-name'='my-oracle-database'
      'debezium.database.history.store.only.captured.tables.ddl'='true'
      'debezium.include.schema.changes'='false'
      'hostname'='my-oracle-host'
      'password'='******'
      'port'='1521'
      'scan.incremental.snapshot.enabled'='false'
      'scan.snapshot.fetch.size'='10'
      'scan.startup.mode'='initial'
      'schema-name'='my-oracle-schema'
      'server-time-zone'='Asia/Shanghai'
      'table-name'='TEST'
      'username'='my-oracle-username'
      at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:167)
      at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:192)
      at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:175)
      at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:115)
      at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:357)
      at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:158)
      at org.apache.flink.table.operations.SourceQueryOperation.accept(SourceQueryOperation.java:86)
      at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:155)
      at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:135)
      at org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:92)
      at org.apache.flink.table.operations.SourceQueryOperation.accept(SourceQueryOperation.java:86)
      at org.apache.flink.table.planner.calcite.FlinkRelBuilder.queryOperation(FlinkRelBuilder.java:261)
      at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:289)
      at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:194)
      at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
      at scala.collection.Iterator.foreach(Iterator.scala:943)
      at scala.collection.Iterator.foreach$(Iterator.scala:943)
      at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
      at scala.collection.IterableLike.foreach(IterableLike.scala:74)
      at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
      at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
      at scala.collection.TraversableLike.map(TraversableLike.scala:286)
      at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
      at scala.collection.AbstractTraversable.map(Traversable.scala:108)
      at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:194)
      at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:224)
      at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:219)
      at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toChangelogStream(StreamTableEnvironmentImpl.java:263)
      at org.codebase.flink.cdc.FlinkOracleCdcTest.main(FlinkOracleCdcTest.java:70)
      Caused by: org.apache.flink.table.api.ValidationException: Unsupported options found for 'oracle-cdc'.

      Unsupported options:

      server-time-zone

      Supported options:

      chunk-key.even-distribution.factor.lower-bound
      chunk-key.even-distribution.factor.upper-bound
      chunk-meta.group.size
      connect.max-retries
      connect.timeout
      connection.pool.size
      connector
      database-name
      debezium.database.history.store.only.captured.tables.ddl
      debezium.include.schema.changes
      hostname
      password
      port
      property-version
      scan.incremental.close-idle-reader.enabled
      scan.incremental.snapshot.backfill.skip
      scan.incremental.snapshot.chunk.key-column
      scan.incremental.snapshot.chunk.size
      scan.incremental.snapshot.enabled
      scan.snapshot.fetch.size
      scan.startup.mode
      schema-name
      split-key.even-distribution.factor.lower-bound
      split-key.even-distribution.factor.upper-bound
      table-name
      url
      username
      at org.apache.flink.table.factories.FactoryUtil.validateUnconsumedKeys(FactoryUtil.java:632)
      at org.apache.flink.table.factories.FactoryUtil$FactoryHelper.validate(FactoryUtil.java:931)
      at org.apache.flink.table.factories.FactoryUtil$FactoryHelper.validateExcept(FactoryUtil.java:955)
      at com.ververica.cdc.connectors.oracle.table.OracleTableSourceFactory.createDynamicTableSource(OracleTableSourceFactory.java:70)
      at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:164]
      ... 28 more
      ```

          1. What did you expect to see?

      The program should run and print data

          1. What did you see instead?

      Exception

          1. Anything else?

      !image
      !image

      As we can see in the screenshots, `OracleTableSourceFactory` does not support `server-time-zone` option as well as `MySqlTableSourceFactory` support this option. By the way, we can specify this option in the `OracleSourceBuilder` class.

          1. Are you willing to submit a PR?
      • [X] I'm willing to submit a PR!

      ---------------- Imported from GitHub ----------------
      Url: https://github.com/apache/flink-cdc/issues/2977
      Created by: LiuBodong
      Labels: bug,
      Created at: Tue Jan 09 09:24:10 CST 2024
      State: open

      Attachments

        Activity

          People

            Unassigned Unassigned
            flink-cdc-import Flink CDC Issue Import
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: