Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
cdc-3.2.0, cdc-3.1.1, cdc-3.3.0
Description
Oracle When reading via OracleIncrementalSource, the connection is occasionally closed.
14:57:56,432 INFO org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader [Source Data Fetcher for Source: doris_source[1] -> SinkConversion[2] -> Sink: Print to Std. Out (1/1)#0] [] - Close snapshot reader org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher 14:57:56,597 INFO io.debezium.jdbc.JdbcConnection [pool-14-thread-1] [] - Connection gracefully closed 14:57:56,602 ERROR io.debezium.connector.oracle.logminer.LogMinerHelper [debezium-snapshot-reader-0] [] - Mining session stopped due to the java.sql.SQLException: 关闭的 Resultset: getLong 14:57:56,603 ERROR io.debezium.pipeline.ErrorHandler [debezium-snapshot-reader-0] [] - Producer failure java.sql.SQLException: 关闭的 Resultset: getLong at oracle.jdbc.driver.GeneratedScrollableResultSet.getLong(GeneratedScrollableResultSet.java:254) ~[ojdbc8-19.3.0.0.jar:19.3.0.0.0] at io.debezium.connector.oracle.OracleConnection.lambda$getSessionStatisticByName$10(OracleConnection.java:373) ~[debezium-connector-oracle-1.9.8.Final.jar:1.9.8.Final] at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:642) ~[debezium-core-1.9.8.Final.jar:1.9.8.Final] at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:510) ~[debezium-core-1.9.8.Final.jar:1.9.8.Final] at io.debezium.connector.oracle.OracleConnection.getSessionStatisticByName(OracleConnection.java:372) ~[debezium-connector-oracle-1.9.8.Final.jar:1.9.8.Final] at io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.captureSessionMemoryStatistics(LogMinerStreamingChangeEventSource.java:353) ~[flink-connector-oracle-cdc/:?] at io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:258) ~[flink-connector-oracle-cdc/:?] at org.apache.flink.cdc.connectors.oracle.source.reader.fetch.OracleStreamFetchTask$RedoLogSplitReadTask.execute(OracleStreamFetchTask.java:139) ~[flink-connector-oracle-cdc/:?] at org.apache.flink.cdc.connectors.oracle.source.reader.fetch.OracleScanFetchTask.executeBackfillTask(OracleScanFetchTask.java:106) ~[flink-connector-oracle-cdc/:?] at org.apache.flink.cdc.connectors.base.source.reader.external.AbstractScanFetchTask.execute(AbstractScanFetchTask.java:112) ~[flink-cdc-base/:?] at org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.lambda$submitTask$1(IncrementalSourceScanFetcher.java:99) ~[flink-cdc-base/:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_322] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_322] at java.lang.Thread.run(Thread.java:750) [?:1.8.0_322]
reason:
This is because after split is read, the reader will be closed, at which point LogMinerStreamingChangeEventSource will perform captureSessionMemoryStatistics to obtain statistical information.
Finally, in the code
public <T> T queryAndMap(String query, StatementFactory statementFactory, ResultSetMapper<T> mapper) throws SQLException { Objects.requireNonNull(mapper, "Mapper must be provided"); Connection conn = connection(); // Check if the conn is connected try (Statement statement = statementFactory.createStatement(conn);) { if (LOGGER.isTraceEnabled()) { LOGGER.trace("running '{}'", query); } try (ResultSet resultSet = statement.executeQuery(query);) { //When you get here, split executes the close method to close the connection, and an error will be reported return mapper.apply(resultSet); } } }
solve:
1. we can regenerate a connection before calling the captureSessionMemoryStatistics(connection) method, but this will be time-consuming. In my local test, it took 6 seconds.
2. Since captureSessionMemoryStatistics is just statistical information, I think it can be placed before process, so that it can ensure that the connection is no longer in use when split reader close