Index: conf/hive-default.xml =================================================================== --- conf/hive-default.xml (revision 1096632) +++ conf/hive-default.xml (working copy) @@ -792,6 +792,18 @@ + hive.stats.retries.max + 0 + Maximum number of retries when stats publisher/aggregator got an exception updating intermediate database. Default is no tries on failures. + + + + hive.stats.retries.wait + 3000 + The base waiting window (in milliseconds) before the next retry. The actual wait time is calculated by baseWindow * failues + baseWindow * (failure + 1) * (random number between [0.0,1.0]). + + + hive.support.concurrency false Whether hive supports concurrency or not. A zookeeper instance must be up and running for the default hive lock manager to support read-write locks. Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1096632) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -379,6 +379,10 @@ 30), // default timeout in sec for JDBC connection & SQL statements HIVE_STATS_ATOMIC("hive.stats.atomic", false), // whether to update metastore stats only if all stats are available + HIVE_STATS_RETRIES_MAX("hive.stats.retries.max", + 0), // maximum # of retries to insert/select/delete the stats DB + HIVE_STATS_RETRIES_WAIT("hive.stats.retries.wait", + 3000), // # milliseconds to wait before the next retry // Concurrency @@ -429,7 +433,7 @@ // temporary variable for testing. This is added just to turn off this feature in case of a bug in // deployment. It has not been documented in hive-default.xml intentionally, this should be removed // once the feature is stable - HIVE_MAPPER_CANNOT_SPAN_MULTIPLE_PARTITIONS("hive.mapper.cannot.span.multiple.partitions", false), + HIVE_MAPPER_CANNOT_SPAN_MULTIPLE_PARTITIONS("hive.mapper.cannot.span.multiple.partitions", false), ; Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (revision 1096632) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (working copy) @@ -43,6 +43,11 @@ import java.net.URI; import java.net.URL; import java.net.URLClassLoader; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.SQLTransientException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; @@ -2002,4 +2007,156 @@ sb.append(">"); _log.info(sb); } + + public static class SQLCommand { + public T run(PreparedStatement stmt) throws SQLException { + return null; + } + } + + /** + * Retry SQL execution with random backoff (same as the one implemented in HDFS-767). + * This function only retries when the SQL query throws a SQLTransientException (which + * might be able to succeed with a simple retry). It doesn't retry when the exception + * is a SQLRecoverableException or SQLNonTransientException. For SQLRecoverableException + * the caller needs to reconnect to the database and restart the whole transaction. + * + * @param query the prepared statement of SQL. + * @param type either SQLCommandType.QUERY or SQLCommandType.UPDATE + * @param baseWindow The base time window (in milliseconds) before the next retry. + * see {@getRandomWaitTime} for details. + * @param maxRetries the maximum # of retries when getting a SQLTransientException. + * @throws SQLException throws SQLRecoverableException or SQLNonTransientException the + * first time it is caught, or SQLTransientException when the maxRetries has reached. + */ + public static T executeWithRetry(SQLCommand cmd, PreparedStatement stmt, + int baseWindow, int maxRetries) throws SQLException { + + Random r = new Random(); + T result = null; + + // retry with # of maxRetries before throwing exception + for (int failures = 0; ; failures++) { + try { + result = cmd.run(stmt); + return result; + } catch (SQLTransientException e) { + LOG.warn("Failure and retry #" + failures + " with exception " + e.getMessage()); + if (failures >= maxRetries) { + throw e; + } + long waitTime = getRandomWaitTime(baseWindow, failures, r); + try { + Thread.sleep(waitTime); + } catch (InterruptedException iex) { + } + } catch (SQLException e) { + // throw other types of SQLExceptions (SQLNonTransientException / SQLRecoverableException) + throw e; + } + } + } + + /** + * Retry connecting to a database with random backoff (same as the one implemented in HDFS-767). + * This function only retries when the SQL query throws a SQLTransientException (which + * might be able to succeed with a simple retry). It doesn't retry when the exception + * is a SQLRecoverableException or SQLNonTransientException. For SQLRecoverableException + * the caller needs to reconnect to the database and restart the whole transaction. + * + * @param connectionString the JDBC connection string. + * @param baseWindow The base time window (in milliseconds) before the next retry. + * see {@getRandomWaitTime} for details. + * @param maxRetries the maximum # of retries when getting a SQLTransientException. + * @throws SQLException throws SQLRecoverableException or SQLNonTransientException the + * first time it is caught, or SQLTransientException when the maxRetries has reached. + */ + public static Connection connectWithRetry(String connectionString, + int waitWindow, int maxRetries) throws SQLException { + + Random r = new Random(); + + // retry with # of maxRetries before throwing exception + for (int failures = 0; ; failures++) { + try { + Connection conn = DriverManager.getConnection(connectionString); + return conn; + } catch (SQLTransientException e) { + if (failures >= maxRetries) { + LOG.error("Error during JDBC connection. " + e); + throw e; + } + long waitTime = Utilities.getRandomWaitTime(waitWindow, failures, r); + try { + Thread.sleep(waitTime); + } catch (InterruptedException e1) { + } + } catch (SQLException e) { + // just throw other types (SQLNonTransientException / SQLRecoverableException) + throw e; + } + } + } + + /** + * Retry preparing a SQL statement with random backoff (same as the one implemented in HDFS-767). + * This function only retries when the SQL query throws a SQLTransientException (which + * might be able to succeed with a simple retry). It doesn't retry when the exception + * is a SQLRecoverableException or SQLNonTransientException. For SQLRecoverableException + * the caller needs to reconnect to the database and restart the whole transaction. + * + * @param conn a JDBC connection. + * @param stmt the SQL statement to be prepared. + * @param baseWindow The base time window (in milliseconds) before the next retry. + * see {@getRandomWaitTime} for details. + * @param maxRetries the maximum # of retries when getting a SQLTransientException. + * @throws SQLException throws SQLRecoverableException or SQLNonTransientException the + * first time it is caught, or SQLTransientException when the maxRetries has reached. + */ + public static PreparedStatement prepareWithRetry(Connection conn, String stmt, + int waitWindow, int maxRetries) throws SQLException { + + Random r = new Random(); + + // retry with # of maxRetries before throwing exception + for (int failures = 0; ; failures++) { + try { + return conn.prepareStatement(stmt); + } catch (SQLTransientException e) { + if (failures >= maxRetries) { + LOG.error("Error preparing JDBC Statement " + stmt + " :" + e); + throw e; + } + long waitTime = Utilities.getRandomWaitTime(waitWindow, failures, r); + try { + Thread.sleep(waitTime); + } catch (InterruptedException e1) { + } + } catch (SQLException e) { + // just throw other types (SQLNonTransientException / SQLRecoverableException) + throw e; + } + } + } + + /** + * Introducing a random factor to the wait time before another retry. + * The wait time is dependent on # of failures and a random factor. + * At the first time of getting an exception , the wait time + * is a random number between 0..baseWindow msec. If the first retry + * still fails, we will wait baseWindow msec grace period before the 2nd retry. + * Also at the second retry, the waiting window is expanded to 2*baseWindow msec + * alleviating the request rate from the server. Similarly the 3rd retry + * will wait 2*baseWindow msec. grace period before retry and the waiting window is + * expanded to 3*baseWindow msec and so on. + * @param baseWindow the base waiting window. + * @param failures number of failures so far. + * @param r a random generator. + * @return number of milliseconds for the next wait time. + */ + public static long getRandomWaitTime(int baseWindow, int failures, Random r) { + return (long) ( + baseWindow * failures + // grace period for the last round of attempt + baseWindow * (failures + 1) * r.nextDouble()); // expanding time window for each failure + } } Index: ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java (revision 1096632) +++ ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java (working copy) @@ -24,12 +24,15 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.SQLRecoverableException; import java.sql.Statement; +import java.util.Random; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.stats.StatsPublisher; import org.apache.hadoop.hive.ql.stats.StatsSetupConst; @@ -40,56 +43,92 @@ private Configuration hiveconf; private final Log LOG = LogFactory.getLog(this.getClass().getName()); private PreparedStatement selStmt, updStmt, insStmt; - private int timeout = 30; // default timeout in sec. for JDBC connection and statements + private int timeout; // default timeout in sec. for JDBC connection and statements // SQL comment that identifies where the SQL statement comes from private final String comment = "Hive stats publishing: " + this.getClass().getName(); + private int maxRetries, waitWindow; + private final Random r; - public JDBCStatsPublisher() { - selStmt = updStmt = insStmt = null; + r = new Random(); } + @Override public boolean connect(Configuration hiveconf) { + this.hiveconf = hiveconf; + maxRetries = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_RETRIES_MAX); + waitWindow = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_RETRIES_WAIT); + connectionString = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSDBCONNECTIONSTRING); + timeout = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_JDBC_TIMEOUT); + String driver = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSJDBCDRIVER); + try { - this.hiveconf = hiveconf; - connectionString = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSDBCONNECTIONSTRING); - timeout = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_JDBC_TIMEOUT); - String driver = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSJDBCDRIVER); Class.forName(driver).newInstance(); - DriverManager.setLoginTimeout(timeout); // stats is non-blocking - conn = DriverManager.getConnection(connectionString); + } catch (Exception e) { + LOG.error("Error during instantiating JDBC driver " + driver + ". ", e); + return false; + } - // prepare the SELECT/UPDATE/INSERT statements - String select = - "SELECT /* " + comment + " */ " + JDBCStatsSetupConstants.PART_STAT_ROW_COUNT_COLUMN_NAME + - " FROM " + JDBCStatsSetupConstants.PART_STAT_TABLE_NAME + - " WHERE " + JDBCStatsSetupConstants.PART_STAT_ID_COLUMN_NAME + " = ?"; + // prepare the SELECT/UPDATE/INSERT statements + String select = + "SELECT /* " + comment + " */ " + JDBCStatsSetupConstants.PART_STAT_ROW_COUNT_COLUMN_NAME + + " FROM " + JDBCStatsSetupConstants.PART_STAT_TABLE_NAME + + " WHERE " + JDBCStatsSetupConstants.PART_STAT_ID_COLUMN_NAME + " = ?"; - String update = - "UPDATE /* " + comment + " */ "+ JDBCStatsSetupConstants.PART_STAT_TABLE_NAME + - " SET " + JDBCStatsSetupConstants.PART_STAT_ROW_COUNT_COLUMN_NAME + "= ? " + - " WHERE " + JDBCStatsSetupConstants.PART_STAT_ID_COLUMN_NAME + " = ?"; + String update = + "UPDATE /* " + comment + " */ "+ JDBCStatsSetupConstants.PART_STAT_TABLE_NAME + + " SET " + JDBCStatsSetupConstants.PART_STAT_ROW_COUNT_COLUMN_NAME + "= ? " + + " WHERE " + JDBCStatsSetupConstants.PART_STAT_ID_COLUMN_NAME + " = ?"; - String insert = - "INSERT INTO /* " + comment + " */ " + JDBCStatsSetupConstants.PART_STAT_TABLE_NAME + - " VALUES (?, ?)"; + String insert = + "INSERT INTO /* " + comment + " */ " + JDBCStatsSetupConstants.PART_STAT_TABLE_NAME + + " VALUES (?, ?)"; - selStmt = conn.prepareStatement(select); - updStmt = conn.prepareStatement(update); - insStmt = conn.prepareStatement(insert); + DriverManager.setLoginTimeout(timeout); // stats is non-blocking - // make the statements non-blocking - selStmt.setQueryTimeout(timeout); - updStmt.setQueryTimeout(timeout); - insStmt.setQueryTimeout(timeout); + // function pointer for executeWithRetry to setQueryTimeout + Utilities.SQLCommand setQueryTimeout = new Utilities.SQLCommand() { + @Override + public Void run(PreparedStatement stmt) throws SQLException { + stmt.setQueryTimeout(timeout); + return null; + } + }; - return true; - } catch (Exception e) { - LOG.error("Error during JDBC connection to " + connectionString + ". ", e); - return false; + for (int failures = 0; ; failures++) { + try { + conn = Utilities.connectWithRetry(connectionString, waitWindow, maxRetries); + + // prepare statements + selStmt = Utilities.prepareWithRetry(conn, select, waitWindow, maxRetries); + updStmt = Utilities.prepareWithRetry(conn, update, waitWindow, maxRetries); + insStmt = Utilities.prepareWithRetry(conn, insert, waitWindow, maxRetries); + + // set query timeout + Utilities.executeWithRetry(setQueryTimeout, selStmt, waitWindow, maxRetries); + Utilities.executeWithRetry(setQueryTimeout, updStmt, waitWindow, maxRetries); + Utilities.executeWithRetry(setQueryTimeout, insStmt, waitWindow, maxRetries); + + return true; + } catch (SQLRecoverableException e) { + if (failures >= maxRetries) { + LOG.error("Error during JDBC connection to " + connectionString + ". ", e); + return false; // just return false without fail the task + } + long waitTime = Utilities.getRandomWaitTime(waitWindow, failures, r); + try { + Thread.sleep(waitTime); + } catch (InterruptedException e1) { + } + } catch (SQLException e) { + // for SQLTransientException (maxRetries already achieved at Utilities retry functions + // or SQLNonTransientException, declare a real failure + return false; + } } } + @Override public boolean publishStat(String fileID, String statType, String value) { if (conn == null) { @@ -104,34 +143,70 @@ } LOG.info("Stats publishing for key " + fileID + ". Value = " + value); - try { + Utilities.SQLCommand execQuery = new Utilities.SQLCommand() { + @Override + public ResultSet run(PreparedStatement stmt) throws SQLException { + return stmt.executeQuery(); + } + }; - // Check to see if a previous task (mapper attempt) had published a previous stat - selStmt.setString(1, fileID); - ResultSet result = selStmt.executeQuery(); + Utilities.SQLCommand execUpdate = new Utilities.SQLCommand() { + @Override + public Void run(PreparedStatement stmt) throws SQLException { + stmt.executeUpdate(); + return null; + } + }; - if (result.next()) { - long currval = result.getLong(1); - // Only update if the previous value is smaller (i.e. the previous attempt was a fail and - // hopefully this attempt is a success (as it has a greater value). - if (currval < Long.parseLong(value)) { - updStmt.setString(1, value); - updStmt.setString(2, fileID); - updStmt.executeUpdate(); + for (int failures = 0; ; failures++) { + try { + + // Check to see if a previous task (mapper attempt) had published a previous stat + selStmt.setString(1, fileID); + ResultSet result = Utilities.executeWithRetry(execQuery, selStmt, waitWindow, maxRetries); + + if (result.next()) { + long currval = result.getLong(1); + // Only update if the previous value is smaller (i.e. the previous attempt was a fail and + // hopefully this attempt is a success (as it has a greater value). + if (currval < Long.parseLong(value)) { + updStmt.setString(1, value); + updStmt.setString(2, fileID); + Utilities.executeWithRetry(execUpdate, updStmt, waitWindow, maxRetries); + } + } else { + // No previous attempts. + insStmt.setString(1, fileID); + insStmt.setString(2, value); + Utilities.executeWithRetry(execUpdate, insStmt, waitWindow, maxRetries); } - } else { - // No previous attempts. - insStmt.setString(1, fileID); - insStmt.setString(2, value); - insStmt.executeUpdate(); + return true; + } catch (SQLRecoverableException e) { + // need to start from scratch (connection) + if (failures >= maxRetries) { + return false; + } + // close the current connection + closeConnection(); + long waitTime = Utilities.getRandomWaitTime(waitWindow, failures, r); + try { + Thread.sleep(waitTime); + } catch (InterruptedException iex) { + } + // get a new connection + if (!connect(hiveconf)) { + // if cannot reconnect, just fail because connect() already handles retries. + LOG.error("Error during publishing aggregation. " + e); + return false; + } + } catch (SQLException e) { + LOG.error("Error during publishing statistics. ", e); + return false; } - return true; - } catch (SQLException e) { - LOG.error("Error during publishing statistics. ", e); - return false; } } + @Override public boolean closeConnection() { if (conn == null) { return true; @@ -167,6 +242,11 @@ } } + /** + * Initialize the intermediate stats DB for the first time it is running (e.g., + * creating tables.). + */ + @Override public boolean init(Configuration hconf) { try { this.hiveconf = hconf; Index: ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java (revision 1096632) +++ ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java (working copy) @@ -20,14 +20,18 @@ import java.sql.Connection; import java.sql.DriverManager; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.SQLRecoverableException; import java.sql.Statement; +import java.util.Random; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.stats.StatsAggregator; import org.apache.hadoop.hive.ql.stats.StatsSetupConst; @@ -36,25 +40,94 @@ private Connection conn; private String connectionString; private Configuration hiveconf; + private PreparedStatement selStmt, delStmt; private final Log LOG = LogFactory.getLog(this.getClass().getName()); private int timeout = 30; private final String comment = "Hive stats aggregation: " + this.getClass().getName(); + private int maxRetries, waitWindow; + private final Random r; + public JDBCStatsAggregator() { + r = new Random(); + } + + @Override public boolean connect(Configuration hiveconf) { + this.hiveconf = hiveconf; + timeout = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_JDBC_TIMEOUT); + connectionString = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSDBCONNECTIONSTRING); + String driver = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSJDBCDRIVER); + maxRetries = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_RETRIES_MAX); + waitWindow = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_RETRIES_WAIT); + try { - this.hiveconf = hiveconf; - timeout = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_JDBC_TIMEOUT); - connectionString = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSDBCONNECTIONSTRING); - String driver = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSJDBCDRIVER); Class.forName(driver).newInstance(); - // stats is non-blocking -- throw an exception when timeout - DriverManager.setLoginTimeout(timeout); - conn = DriverManager.getConnection(connectionString); - return true; } catch (Exception e) { - LOG.error("Error during JDBC connection. " + e); + LOG.error("Error during instantiating JDBC driver " + driver + ". ", e); return false; } + + // prepare the SELECT/DELETE statements + String select = + "SELECT /* " + comment + " */ " + + " SUM(" + JDBCStatsSetupConstants.PART_STAT_ROW_COUNT_COLUMN_NAME + ")" + + " FROM " + JDBCStatsSetupConstants.PART_STAT_TABLE_NAME + + " WHERE " + JDBCStatsSetupConstants.PART_STAT_ID_COLUMN_NAME + " LIKE ?"; + + /* Automatic Cleaning: + IMPORTANT: Since we publish and aggregate only 1 value (1 column) which is the row count, it + is valid to delete the row after aggregation (automatic cleaning) because we know that there is no + other values to aggregate. + If ;in the future; other values are aggregated and published, then we cannot do cleaning except + when we are sure that all values are aggregated, or we can separate the implementation of cleaning + through a separate method which the developer has to call it manually in the code. + */ + String delete = + "DELETE /* " + comment + " */ " + + " FROM " + JDBCStatsSetupConstants.PART_STAT_TABLE_NAME + + " WHERE " + JDBCStatsSetupConstants.PART_STAT_ID_COLUMN_NAME + " LIKE ?"; + + // stats is non-blocking -- throw an exception when timeout + DriverManager.setLoginTimeout(timeout); + // function pointer for executeWithRetry to setQueryTimeout + Utilities.SQLCommand setQueryTimeout = new Utilities.SQLCommand() { + @Override + public Void run(PreparedStatement stmt) throws SQLException { + stmt.setQueryTimeout(timeout); + return null; + } + }; + + // retry connection and statement preparations + for (int failures = 0; ;failures++) { + try { + conn = Utilities.connectWithRetry(connectionString, waitWindow, maxRetries); + + // prepare statements + selStmt = Utilities.prepareWithRetry(conn, select, waitWindow, maxRetries); + delStmt = Utilities.prepareWithRetry(conn, delete, waitWindow, maxRetries); + + // set query timeout + Utilities.executeWithRetry(setQueryTimeout, selStmt, waitWindow, failures); + Utilities.executeWithRetry(setQueryTimeout, delStmt, waitWindow, failures); + + return true; + } catch (SQLRecoverableException e) { + if (failures > maxRetries) { + LOG.error("Error during JDBC connection and preparing statement: " + e); + return false; + } + long waitTime = Utilities.getRandomWaitTime(waitWindow, failures, r); + try { + Thread.sleep(waitTime); + } catch (InterruptedException e1) { + } + } catch (SQLException e) { + // for SQLTransientException (maxRetries already achieved at Utilities retry functions + // or SQLNonTransientException, declare a real failure + return false; + } + } } @Override @@ -68,49 +141,77 @@ return null; } - try { - long retval = 0; - Statement stmt = conn.createStatement(); - stmt.setQueryTimeout(timeout); - String select = - "SELECT /* " + comment + " */ " + - " SUM(" + JDBCStatsSetupConstants.PART_STAT_ROW_COUNT_COLUMN_NAME + ")" + - " FROM " + JDBCStatsSetupConstants.PART_STAT_TABLE_NAME + - " WHERE " + JDBCStatsSetupConstants.PART_STAT_ID_COLUMN_NAME + " LIKE '" + fileID + "%'"; + Utilities.SQLCommand execQuery = new Utilities.SQLCommand() { + @Override + public ResultSet run(PreparedStatement stmt) throws SQLException { + return stmt.executeQuery(); + } + }; - ResultSet result = stmt.executeQuery(select); - if (result.next()) { - retval = result.getLong(1); - } else { - LOG.warn("Warning. Nothing published. Nothing aggregated."); - return ""; + Utilities.SQLCommand execUpdate = new Utilities.SQLCommand() { + @Override + public Void run(PreparedStatement stmt) throws SQLException { + stmt.executeUpdate(); + return null; } - stmt.clearBatch(); + }; - /* Automatic Cleaning: - IMPORTANT: Since we publish and aggregate only 1 value (1 column) which is the row count, it - is valid to delete the row after aggregation (automatic cleaning) because we know that there is no - other values to aggregate. - If ;in the future; other values are aggregated and published, then we cannot do cleaning except - when we are sure that all values are aggregated, or we can separate the implementation of cleaning - through a separate method which the developer has to call it manually in the code. - */ - String delete = - "DELETE /* " + comment + " */ " + - " FROM " + JDBCStatsSetupConstants.PART_STAT_TABLE_NAME + - " WHERE " + JDBCStatsSetupConstants.PART_STAT_ID_COLUMN_NAME + " LIKE '" + fileID + "%'"; - stmt.executeUpdate(delete); - stmt.close(); + for (int failures = 0; ; failures++) { + try { + long retval = 0; - LOG.info("Stats aggregator got " + retval); + String keyPrefix = fileID + "%"; + selStmt.setString(1, keyPrefix); + ResultSet result = Utilities.executeWithRetry(execQuery, selStmt, waitWindow, maxRetries); + if (result.next()) { + retval = result.getLong(1); + } else { + LOG.warn("Warning. Nothing published. Nothing aggregated."); + return ""; + } - return Long.toString(retval); - } catch (SQLException e) { - LOG.error("Error during publishing aggregation. " + e); - return null; + /* Automatic Cleaning: + IMPORTANT: Since we publish and aggregate only 1 value (1 column) which is the row count, it + is valid to delete the row after aggregation (automatic cleaning) because we know that there is no + other values to aggregate. + If in the future; other values are aggregated and published, then we cannot do cleaning except + when we are sure that all values are aggregated, or we can separate the implementation of cleaning + through a separate method which the developer has to call it manually in the code. + */ + delStmt.setString(1, keyPrefix); + Utilities.executeWithRetry(execUpdate, delStmt, waitWindow, maxRetries); + + LOG.info("Stats aggregator got " + retval); + + return Long.toString(retval); + } catch (SQLRecoverableException e) { + // need to start from scratch (connection) + if (failures >= maxRetries) { + return null; + } + // close the current connection + closeConnection(); + long waitTime = Utilities.getRandomWaitTime(waitWindow, failures, r); + try { + Thread.sleep(waitTime); + } catch (InterruptedException iex) { + } + // getting a new connection + if (!connect(hiveconf)) { + // if cannot reconnect, just fail because connect() already handles retries. + LOG.error("Error during publishing aggregation. " + e); + return null; + } + } catch (SQLException e) { + // for SQLTransientException (already handled by Utilities.*WithRetries() functions + // and SQLNonTransientException, just declare failure. + LOG.error("Error during publishing aggregation. " + e); + return null; + } } } + @Override public boolean closeConnection() { if (conn == null) { @@ -136,6 +237,7 @@ } } + @Override public boolean cleanUp(String rowID) { try { Statement stmt = conn.createStatement();