Index: conf/hive-default.xml =================================================================== --- conf/hive-default.xml (revision 1095959) +++ conf/hive-default.xml (working copy) @@ -786,6 +786,18 @@ + hive.stats.retries.max + 0 + Maximum number of retries when stats publisher/aggregator got an exception updating intermediate database. Default is no tries on failures. + + + + hive.stats.retries.wait + 3000 + The base waiting window (in milliseconds) before the next retry. The actual wait time is calculated by baseWindow * failues + baseWindow * (failure + 1) * (random number between [0.0,1.0]). + + + hive.support.concurrency false Whether hive supports concurrency or not. A zookeeper instance must be up and running for the default hive lock manager to support read-write locks. Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1095959) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -378,6 +378,10 @@ 30), // default timeout in sec for JDBC connection & SQL statements HIVE_STATS_ATOMIC("hive.stats.atomic", false), // whether to update metastore stats only if all stats are available + HIVE_STATS_RETRIES_MAX("hive.stats.retries.max", + 0), // maximum # of retries to insert/select/delete the stats DB + HIVE_STATS_RETRIES_WAIT("hive.stats.retries.wait", + 3000), // # milliseconds to wait before the next retry // Concurrency @@ -428,7 +432,7 @@ // temporary variable for testing. This is added just to turn off this feature in case of a bug in // deployment. It has not been documented in hive-default.xml intentionally, this should be removed // once the feature is stable - HIVE_MAPPER_CANNOT_SPAN_MULTIPLE_PARTITIONS("hive.mapper.cannot.span.multiple.partitions", false), + HIVE_MAPPER_CANNOT_SPAN_MULTIPLE_PARTITIONS("hive.mapper.cannot.span.multiple.partitions", false), ; Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (revision 1095959) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (working copy) @@ -43,6 +43,9 @@ import java.net.URI; import java.net.URL; import java.net.URLClassLoader; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; @@ -2002,4 +2005,66 @@ sb.append(">"); _log.info(sb); } + + public static enum SQLCommandType { QUERY, UPDATE }; + + /** + * Retry SQL execution with random backoff (same as the one implemented in HDFS-767). + */ + public static ResultSet executeWithRetry(PreparedStatement query, SQLCommandType type, + int baseWindow, int maxRetries) throws SQLException { + + Random r = new Random(); + ResultSet result = null; + + // retry with # of maxRetries before throwing exception + int failures = 0; + while (true) { + try { + switch (type) { + case QUERY: + result = query.executeQuery(); + return result; + case UPDATE: + query.executeUpdate(); + return null; + default: + assert(false); // should not hit + } + } catch (SQLException e) { + failures++; + if (failures > maxRetries) { + throw e; + } + LOG.warn("Failure and retry #" + failures + " with exception " + e.getMessage()); + long waitTime = getRandomWaitTime(baseWindow, failures, r); + try { + Thread.sleep(waitTime); + } catch (InterruptedException iex) { + } + } + } + } + + + /** + * Introducing a random factor to the wait time before another retry. + * The wait time is dependent on # of failures and a random factor. + * At the first time of getting an exception , the wait time + * is a random number between 0..baseWindow msec. If the first retry + * still fails, we will wait baseWindow msec grace period before the 2nd retry. + * Also at the second retry, the waiting window is expanded to 2*baseWindow msec + * alleviating the request rate from the server. Similarly the 3rd retry + * will wait 2*baseWindow msec. grace period before retry and the waiting window is + * expanded to 3*baseWindow msec and so on. + * @param baseWindow the base waiting window. + * @param failures number of failures so far. + * @param r a random generator. + * @return number of milliseconds for the next wait time. + */ + public static long getRandomWaitTime(int baseWindow, int failures, Random r) { + return (long) ( + baseWindow * failures + // grace period for the last round of attempt + baseWindow * (failures + 1) * r.nextDouble()); // expanding time window for each failure + } } Index: ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java (revision 1095959) +++ ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java (working copy) @@ -25,11 +25,13 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.util.Random; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.stats.StatsPublisher; import org.apache.hadoop.hive.ql.stats.StatsSetupConst; @@ -40,56 +42,78 @@ private Configuration hiveconf; private final Log LOG = LogFactory.getLog(this.getClass().getName()); private PreparedStatement selStmt, updStmt, insStmt; - private int timeout = 30; // default timeout in sec. for JDBC connection and statements + private int timeout; // default timeout in sec. for JDBC connection and statements // SQL comment that identifies where the SQL statement comes from private final String comment = "Hive stats publishing: " + this.getClass().getName(); + private int maxRetries, waitWindow; + private final Random r; - public JDBCStatsPublisher() { - selStmt = updStmt = insStmt = null; + r = new Random(); } + @Override public boolean connect(Configuration hiveconf) { + this.hiveconf = hiveconf; + maxRetries = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_RETRIES_MAX); + waitWindow = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_RETRIES_WAIT); + connectionString = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSDBCONNECTIONSTRING); + timeout = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_JDBC_TIMEOUT); + String driver = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSJDBCDRIVER); + try { - this.hiveconf = hiveconf; - connectionString = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSDBCONNECTIONSTRING); - timeout = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_JDBC_TIMEOUT); - String driver = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSJDBCDRIVER); Class.forName(driver).newInstance(); - DriverManager.setLoginTimeout(timeout); // stats is non-blocking - conn = DriverManager.getConnection(connectionString); + } catch (Exception e) { + LOG.error("Error during instantiating JDBC driver " + driver + ". ", e); + return false; + } - // prepare the SELECT/UPDATE/INSERT statements - String select = - "SELECT /* " + comment + " */ " + JDBCStatsSetupConstants.PART_STAT_ROW_COUNT_COLUMN_NAME + - " FROM " + JDBCStatsSetupConstants.PART_STAT_TABLE_NAME + - " WHERE " + JDBCStatsSetupConstants.PART_STAT_ID_COLUMN_NAME + " = ?"; + // prepare the SELECT/UPDATE/INSERT statements + String select = + "SELECT /* " + comment + " */ " + JDBCStatsSetupConstants.PART_STAT_ROW_COUNT_COLUMN_NAME + + " FROM " + JDBCStatsSetupConstants.PART_STAT_TABLE_NAME + + " WHERE " + JDBCStatsSetupConstants.PART_STAT_ID_COLUMN_NAME + " = ?"; - String update = - "UPDATE /* " + comment + " */ "+ JDBCStatsSetupConstants.PART_STAT_TABLE_NAME + - " SET " + JDBCStatsSetupConstants.PART_STAT_ROW_COUNT_COLUMN_NAME + "= ? " + - " WHERE " + JDBCStatsSetupConstants.PART_STAT_ID_COLUMN_NAME + " = ?"; + String update = + "UPDATE /* " + comment + " */ "+ JDBCStatsSetupConstants.PART_STAT_TABLE_NAME + + " SET " + JDBCStatsSetupConstants.PART_STAT_ROW_COUNT_COLUMN_NAME + "= ? " + + " WHERE " + JDBCStatsSetupConstants.PART_STAT_ID_COLUMN_NAME + " = ?"; - String insert = - "INSERT INTO /* " + comment + " */ " + JDBCStatsSetupConstants.PART_STAT_TABLE_NAME + - " VALUES (?, ?)"; + String insert = + "INSERT INTO /* " + comment + " */ " + JDBCStatsSetupConstants.PART_STAT_TABLE_NAME + + " VALUES (?, ?)"; - selStmt = conn.prepareStatement(select); - updStmt = conn.prepareStatement(update); - insStmt = conn.prepareStatement(insert); + DriverManager.setLoginTimeout(timeout); // stats is non-blocking - // make the statements non-blocking - selStmt.setQueryTimeout(timeout); - updStmt.setQueryTimeout(timeout); - insStmt.setQueryTimeout(timeout); + int failures = 0; + while (true) { + try { + conn = DriverManager.getConnection(connectionString); + selStmt = conn.prepareStatement(select); + updStmt = conn.prepareStatement(update); + insStmt = conn.prepareStatement(insert); - return true; - } catch (Exception e) { - LOG.error("Error during JDBC connection to " + connectionString + ". ", e); - return false; + // make the statements non-blocking + selStmt.setQueryTimeout(timeout); + updStmt.setQueryTimeout(timeout); + insStmt.setQueryTimeout(timeout); + return true; + } catch (SQLException e) { + failures++; + if (failures > maxRetries) { + LOG.error("Error during JDBC connection to " + connectionString + ". ", e); + return false; // just return false without fail the task + } + long waitTime = Utilities.getRandomWaitTime(waitWindow, failures, r); + try { + Thread.sleep(waitTime); + } catch (InterruptedException e1) { + } + } } } + @Override public boolean publishStat(String fileID, String statType, String value) { if (conn == null) { @@ -108,7 +132,8 @@ // Check to see if a previous task (mapper attempt) had published a previous stat selStmt.setString(1, fileID); - ResultSet result = selStmt.executeQuery(); + ResultSet result = Utilities.executeWithRetry(selStmt, Utilities.SQLCommandType.QUERY, + waitWindow, maxRetries); if (result.next()) { long currval = result.getLong(1); @@ -117,13 +142,15 @@ if (currval < Long.parseLong(value)) { updStmt.setString(1, value); updStmt.setString(2, fileID); - updStmt.executeUpdate(); + Utilities.executeWithRetry(updStmt, Utilities.SQLCommandType.UPDATE, + waitWindow, maxRetries); } } else { // No previous attempts. insStmt.setString(1, fileID); insStmt.setString(2, value); - insStmt.executeUpdate(); + Utilities.executeWithRetry(insStmt, Utilities.SQLCommandType.UPDATE, + waitWindow, maxRetries); } return true; } catch (SQLException e) { @@ -132,6 +159,7 @@ } } + @Override public boolean closeConnection() { if (conn == null) { return true; @@ -167,6 +195,11 @@ } } + /** + * Initialize the intermediate stats DB for the first time it is running (e.g., + * creating tables.). + */ + @Override public boolean init(Configuration hconf) { try { this.hiveconf = hconf; Index: ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java (revision 1095959) +++ ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java (working copy) @@ -20,14 +20,17 @@ import java.sql.Connection; import java.sql.DriverManager; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.util.Random; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.stats.StatsAggregator; import org.apache.hadoop.hive.ql.stats.StatsSetupConst; @@ -36,25 +39,80 @@ private Connection conn; private String connectionString; private Configuration hiveconf; + private PreparedStatement selStmt, delStmt; private final Log LOG = LogFactory.getLog(this.getClass().getName()); private int timeout = 30; private final String comment = "Hive stats aggregation: " + this.getClass().getName(); + private int maxRetries, waitWindow; + private final Random r; + public JDBCStatsAggregator() { + r = new Random(); + } + + @Override public boolean connect(Configuration hiveconf) { + this.hiveconf = hiveconf; + timeout = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_JDBC_TIMEOUT); + connectionString = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSDBCONNECTIONSTRING); + String driver = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSJDBCDRIVER); + maxRetries = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_RETRIES_MAX); + waitWindow = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_RETRIES_WAIT); + try { - this.hiveconf = hiveconf; - timeout = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_JDBC_TIMEOUT); - connectionString = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSDBCONNECTIONSTRING); - String driver = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSJDBCDRIVER); Class.forName(driver).newInstance(); - // stats is non-blocking -- throw an exception when timeout - DriverManager.setLoginTimeout(timeout); - conn = DriverManager.getConnection(connectionString); - return true; } catch (Exception e) { - LOG.error("Error during JDBC connection. " + e); + LOG.error("Error during instantiating JDBC driver " + driver + ". ", e); return false; } + + // prepare the SELECT/DELETE statements + String select = + "SELECT /* " + comment + " */ " + + " SUM(" + JDBCStatsSetupConstants.PART_STAT_ROW_COUNT_COLUMN_NAME + ")" + + " FROM " + JDBCStatsSetupConstants.PART_STAT_TABLE_NAME + + " WHERE " + JDBCStatsSetupConstants.PART_STAT_ID_COLUMN_NAME + " LIKE ?"; + + /* Automatic Cleaning: + IMPORTANT: Since we publish and aggregate only 1 value (1 column) which is the row count, it + is valid to delete the row after aggregation (automatic cleaning) because we know that there is no + other values to aggregate. + If ;in the future; other values are aggregated and published, then we cannot do cleaning except + when we are sure that all values are aggregated, or we can separate the implementation of cleaning + through a separate method which the developer has to call it manually in the code. + */ + String delete = + "DELETE /* " + comment + " */ " + + " FROM " + JDBCStatsSetupConstants.PART_STAT_TABLE_NAME + + " WHERE " + JDBCStatsSetupConstants.PART_STAT_ID_COLUMN_NAME + " LIKE ?"; + + // stats is non-blocking -- throw an exception when timeout + DriverManager.setLoginTimeout(timeout); + + int failures = 0; + while (true) { + try { + conn = DriverManager.getConnection(connectionString); + selStmt = conn.prepareStatement(select); + delStmt = conn.prepareStatement(delete); + + // make the statements non-blocking + selStmt.setQueryTimeout(timeout); + delStmt.setQueryTimeout(timeout); + return true; + } catch (Exception e) { + failures++; + if (failures > maxRetries) { + LOG.error("Error during JDBC connection. " + e); + return false; + } + long waitTime = Utilities.getRandomWaitTime(waitWindow, failures, r); + try { + Thread.sleep(waitTime); + } catch (InterruptedException e1) { + } + } + } } @Override @@ -70,37 +128,29 @@ try { long retval = 0; - Statement stmt = conn.createStatement(); - stmt.setQueryTimeout(timeout); - String select = - "SELECT /* " + comment + " */ " + - " SUM(" + JDBCStatsSetupConstants.PART_STAT_ROW_COUNT_COLUMN_NAME + ")" + - " FROM " + JDBCStatsSetupConstants.PART_STAT_TABLE_NAME + - " WHERE " + JDBCStatsSetupConstants.PART_STAT_ID_COLUMN_NAME + " LIKE '" + fileID + "%'"; - ResultSet result = stmt.executeQuery(select); + String keyPrefix = fileID + "%"; + selStmt.setString(1, keyPrefix); + ResultSet result = Utilities.executeWithRetry(selStmt, Utilities.SQLCommandType.QUERY, + waitWindow, maxRetries); if (result.next()) { retval = result.getLong(1); } else { LOG.warn("Warning. Nothing published. Nothing aggregated."); return ""; } - stmt.clearBatch(); /* Automatic Cleaning: IMPORTANT: Since we publish and aggregate only 1 value (1 column) which is the row count, it is valid to delete the row after aggregation (automatic cleaning) because we know that there is no other values to aggregate. - If ;in the future; other values are aggregated and published, then we cannot do cleaning except + If in the future; other values are aggregated and published, then we cannot do cleaning except when we are sure that all values are aggregated, or we can separate the implementation of cleaning through a separate method which the developer has to call it manually in the code. */ - String delete = - "DELETE /* " + comment + " */ " + - " FROM " + JDBCStatsSetupConstants.PART_STAT_TABLE_NAME + - " WHERE " + JDBCStatsSetupConstants.PART_STAT_ID_COLUMN_NAME + " LIKE '" + fileID + "%'"; - stmt.executeUpdate(delete); - stmt.close(); + delStmt.setString(1, keyPrefix); + Utilities.executeWithRetry(delStmt, Utilities.SQLCommandType.UPDATE, + waitWindow, maxRetries); LOG.info("Stats aggregator got " + retval); @@ -111,6 +161,7 @@ } } + @Override public boolean closeConnection() { if (conn == null) { @@ -136,6 +187,7 @@ } } + @Override public boolean cleanUp(String rowID) { try { Statement stmt = conn.createStatement();