Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-16281

parameter 'maxRetryTimes' can not work in JDBCUpsertTableSink

    XMLWordPrintableJSON

    Details

      Description

      When I insert data to a mysql table that do no exists in my test database will get exception,

      Caused by: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Table 'test.gmv_table' doesn't existCaused by: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Table 'test.gmv_table' doesn't exist at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at com.mysql.jdbc.Util.handleNewInstance(Util.java:425) at com.mysql.jdbc.Util.getInstance(Util.java:408) at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:944) at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3933) at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3869) at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2524) at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2675) at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2465) at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1912) at com.mysql.jdbc.PreparedStatement.executeUpdateInternal(PreparedStatement.java:2133) at com.mysql.jdbc.PreparedStatement.executeBatchSerially(PreparedStatement.java:1810) ... 44 more
      

      but after I increased the 'connector.write.max-retries' value from 1 to 3, the exception disappeared. 

      So, I look up the flush  implement code :

      public synchronized void flush() throws Exception {
         checkFlushException();
      
         for (int i = 1; i <= maxRetryTimes; i++) {
            try {
               jdbcWriter.executeBatch();
               batchCount = 0;
               break;
            } catch (SQLException e) {
               LOG.error("JDBC executeBatch error, retry times = {}", i, e);
               if (i >= maxRetryTimes) {
                  throw e;
               }
               Thread.sleep(1000 * i);
            }
         }
      }

      I found the `jdbcWriter` will clear its `batchedArgs` member after first call `jdbcWriter.executeBatch()` as follows:

      //com.mysql.jdbc.PreparedStatement
      finally {
          this.statementExecuting.set(false);
          clearBatch();
      }
      
      // clearBatch() function implement
      public void clearBatch() throws SQLException {
       synchronized (checkClosed().getConnectionMutex()) {
       if (this.batchedArgs != null) {
       this.batchedArgs.clear();
       }
       }
      }
      

      and the next time( where i> 1) to call `jdbcWriter.executeBatch()` ,  the function will return empty array rather than execute the flush data

      //com.mysql.jdbc.PreparedStatement
      if (this.batchedArgs == null || this.batchedArgs.size() == 0) {
          return new long[0];
      }
      ... // flush data code

       

       

        Attachments

          Activity

            People

            • Assignee:
              Leonard Xu Leonard Xu
              Reporter:
              Leonard Xu Leonard Xu
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Time Tracking

                Estimated:
                Original Estimate - Not Specified
                Not Specified
                Remaining:
                Remaining Estimate - 0h
                0h
                Logged:
                Time Spent - 0.5h
                0.5h