Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-36649

Oracle When reading via OracleIncrementalSource, the connection is occasionally closed

    XMLWordPrintableJSON

Details

    Description

      Oracle When reading via OracleIncrementalSource, the connection is occasionally closed.

       

      14:57:56,432 INFO  org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader [Source Data Fetcher for Source: doris_source[1] -> SinkConversion[2] -> Sink: Print to Std. Out (1/1)#0] [] - Close snapshot reader org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher
      14:57:56,597 INFO  io.debezium.jdbc.JdbcConnection                              [pool-14-thread-1] [] - Connection gracefully closed
      14:57:56,602 ERROR io.debezium.connector.oracle.logminer.LogMinerHelper         [debezium-snapshot-reader-0] [] - Mining session stopped due to the java.sql.SQLException: 关闭的 Resultset: getLong
      14:57:56,603 ERROR io.debezium.pipeline.ErrorHandler                            [debezium-snapshot-reader-0] [] - Producer failure
      java.sql.SQLException: 关闭的 Resultset: getLong
          at oracle.jdbc.driver.GeneratedScrollableResultSet.getLong(GeneratedScrollableResultSet.java:254) ~[ojdbc8-19.3.0.0.jar:19.3.0.0.0]
          at io.debezium.connector.oracle.OracleConnection.lambda$getSessionStatisticByName$10(OracleConnection.java:373) ~[debezium-connector-oracle-1.9.8.Final.jar:1.9.8.Final]
          at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:642) ~[debezium-core-1.9.8.Final.jar:1.9.8.Final]
          at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:510) ~[debezium-core-1.9.8.Final.jar:1.9.8.Final]
          at io.debezium.connector.oracle.OracleConnection.getSessionStatisticByName(OracleConnection.java:372) ~[debezium-connector-oracle-1.9.8.Final.jar:1.9.8.Final]
          at io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.captureSessionMemoryStatistics(LogMinerStreamingChangeEventSource.java:353) ~[flink-connector-oracle-cdc/:?]
          at io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:258) ~[flink-connector-oracle-cdc/:?]
          at org.apache.flink.cdc.connectors.oracle.source.reader.fetch.OracleStreamFetchTask$RedoLogSplitReadTask.execute(OracleStreamFetchTask.java:139) ~[flink-connector-oracle-cdc/:?]
          at org.apache.flink.cdc.connectors.oracle.source.reader.fetch.OracleScanFetchTask.executeBackfillTask(OracleScanFetchTask.java:106) ~[flink-connector-oracle-cdc/:?]
          at org.apache.flink.cdc.connectors.base.source.reader.external.AbstractScanFetchTask.execute(AbstractScanFetchTask.java:112) ~[flink-cdc-base/:?]
          at org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.lambda$submitTask$1(IncrementalSourceScanFetcher.java:99) ~[flink-cdc-base/:?]
          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_322]
          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_322]
          at java.lang.Thread.run(Thread.java:750) [?:1.8.0_322] 

       

      reason

       

      This is because after split is read, the reader will be closed, at which point LogMinerStreamingChangeEventSource will perform captureSessionMemoryStatistics to obtain statistical information.

      Finally, in the code

       

      public <T> T queryAndMap(String query, StatementFactory statementFactory, ResultSetMapper<T> mapper) throws SQLException {
          Objects.requireNonNull(mapper, "Mapper must be provided");
          Connection conn = connection();      // Check if the conn is connected
          try (Statement statement = statementFactory.createStatement(conn);) {
              if (LOGGER.isTraceEnabled()) {
                  LOGGER.trace("running '{}'", query);
              }
              try (ResultSet resultSet = statement.executeQuery(query);) {
                  //When you get here, split executes the close method to close the connection, and an error will be reported
                  return mapper.apply(resultSet);
              }
          }
      } 

       

      solve:

      1. we can regenerate a connection before calling the captureSessionMemoryStatistics(connection) method, but this will be time-consuming. In my local test, it took 6 seconds.

      2. Since captureSessionMemoryStatistics is just statistical information, I think it can be placed before process, so that it can ensure that the connection is no longer in use when split reader close

       

       

      Attachments

        Activity

          People

            diwu Di Wu
            diwu Di Wu
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: