Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
None
-
None
Description
-
-
- Search before asking
-
- [X] I searched in the [issues|https://github.com/ververica/flink-cdc-connectors/issues) and found nothing similar.
-
-
- Flink version
-
1.17.1
-
-
- Flink CDC version
-
3.0.0
-
-
- Database and its version
-
Oracle Database 11g Enterprise Edition Release 11.2.0.4.0 - 64bit Production With the Partitioning, OLAP, Data Mining and Real Application Testing options
-
-
- Minimal reproduce step
-
-
- Create a cdc source in table api with `server-time-zone` option specified.
```java
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setParallelism(1);
Schema schema = Schema.newBuilder()
.column("NAME", DataTypes.STRING())
.column("ADDR", DataTypes.STRING())
.build();
String factoryIdentifier = new OracleTableSourceFactory().factoryIdentifier();
TableDescriptor tableDescriptor = TableDescriptor.forConnector(factoryIdentifier)
.schema(schema)
// .format(DebeziumJsonFormatFactory.IDENTIFIER)
.option(OracleSourceOptions.HOSTNAME, "my-oracle-host")
.option(OracleSourceOptions.PORT, 1521)
.option(OracleSourceOptions.USERNAME, "my-oracle-username")
.option(OracleSourceOptions.PASSWORD, "my-oracle-password")
.option(OracleSourceOptions.DATABASE_NAME, "my-oracle-database")
.option(OracleSourceOptions.SCHEMA_NAME, "my-oracle-schema")
.option(OracleSourceOptions.TABLE_NAME, "TEST")
.option(OracleSourceOptions.SCAN_STARTUP_MODE, "initial")
.option(OracleSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED, false)
.option(OracleSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE, 10)
.option(OracleSourceOptions.SERVER_TIME_ZONE, "Asia/Shanghai")
.option("debezium.include.schema.changes", "false")
.option("debezium.database.history.store.only.captured.tables.ddl", "true")
.build();
StreamTableEnvironmentImpl tEnv = (StreamTableEnvironmentImpl) StreamTableEnvironmentImpl.create(env, EnvironmentSettings.newInstance().inStreamingMode().build());
Table table = tEnv.from(tableDescriptor);
tEnv.toChangelogStream(table).print();
env.execute();
```
-
- Exceptions are:
```text
Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'anonymous_oracle-cdc$1'.
- Exceptions are:
Table options are:
'connector'='oracle-cdc'
'database-name'='my-oracle-database'
'debezium.database.history.store.only.captured.tables.ddl'='true'
'debezium.include.schema.changes'='false'
'hostname'='my-oracle-host'
'password'='******'
'port'='1521'
'scan.incremental.snapshot.enabled'='false'
'scan.snapshot.fetch.size'='10'
'scan.startup.mode'='initial'
'schema-name'='my-oracle-schema'
'server-time-zone'='Asia/Shanghai'
'table-name'='TEST'
'username'='my-oracle-username'
at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:167)
at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:192)
at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:175)
at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:115)
at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:357)
at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:158)
at org.apache.flink.table.operations.SourceQueryOperation.accept(SourceQueryOperation.java:86)
at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:155)
at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:135)
at org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:92)
at org.apache.flink.table.operations.SourceQueryOperation.accept(SourceQueryOperation.java:86)
at org.apache.flink.table.planner.calcite.FlinkRelBuilder.queryOperation(FlinkRelBuilder.java:261)
at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:289)
at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:194)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:194)
at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:224)
at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:219)
at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toChangelogStream(StreamTableEnvironmentImpl.java:263)
at org.codebase.flink.cdc.FlinkOracleCdcTest.main(FlinkOracleCdcTest.java:70)
Caused by: org.apache.flink.table.api.ValidationException: Unsupported options found for 'oracle-cdc'.
Unsupported options:
server-time-zone
Supported options:
chunk-key.even-distribution.factor.lower-bound
chunk-key.even-distribution.factor.upper-bound
chunk-meta.group.size
connect.max-retries
connect.timeout
connection.pool.size
connector
database-name
debezium.database.history.store.only.captured.tables.ddl
debezium.include.schema.changes
hostname
password
port
property-version
scan.incremental.close-idle-reader.enabled
scan.incremental.snapshot.backfill.skip
scan.incremental.snapshot.chunk.key-column
scan.incremental.snapshot.chunk.size
scan.incremental.snapshot.enabled
scan.snapshot.fetch.size
scan.startup.mode
schema-name
split-key.even-distribution.factor.lower-bound
split-key.even-distribution.factor.upper-bound
table-name
url
username
at org.apache.flink.table.factories.FactoryUtil.validateUnconsumedKeys(FactoryUtil.java:632)
at org.apache.flink.table.factories.FactoryUtil$FactoryHelper.validate(FactoryUtil.java:931)
at org.apache.flink.table.factories.FactoryUtil$FactoryHelper.validateExcept(FactoryUtil.java:955)
at com.ververica.cdc.connectors.oracle.table.OracleTableSourceFactory.createDynamicTableSource(OracleTableSourceFactory.java:70)
at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:164]
... 28 more
```
-
-
- What did you expect to see?
-
The program should run and print data
-
-
- What did you see instead?
-
Exception
-
-
- Anything else?
-
As we can see in the screenshots, `OracleTableSourceFactory` does not support `server-time-zone` option as well as `MySqlTableSourceFactory` support this option. By the way, we can specify this option in the `OracleSourceBuilder` class.
-
-
- Are you willing to submit a PR?
-
- [X] I'm willing to submit a PR!
---------------- Imported from GitHub ----------------
Url: https://github.com/apache/flink-cdc/issues/2977
Created by: LiuBodong
Labels: bug,
Created at: Tue Jan 09 09:24:10 CST 2024
State: open