Index: conf/hive-default.xml
===================================================================
--- conf/hive-default.xml (revision 1095959)
+++ conf/hive-default.xml (working copy)
@@ -786,6 +786,18 @@
+ hive.stats.retries.max
+ 0
+ Maximum number of retries when stats publisher/aggregator got an exception updating intermediate database. Default is no tries on failures.
+
+
+
+ hive.stats.retries.wait
+ 3000
+ The base waiting window (in milliseconds) before the next retry. The actual wait time is calculated by baseWindow * failues + baseWindow * (failure + 1) * (random number between [0.0,1.0]).
+
+
+
hive.support.concurrency
false
Whether hive supports concurrency or not. A zookeeper instance must be up and running for the default hive lock manager to support read-write locks.
Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
===================================================================
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1095959)
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy)
@@ -378,6 +378,10 @@
30), // default timeout in sec for JDBC connection & SQL statements
HIVE_STATS_ATOMIC("hive.stats.atomic",
false), // whether to update metastore stats only if all stats are available
+ HIVE_STATS_RETRIES_MAX("hive.stats.retries.max",
+ 0), // maximum # of retries to insert/select/delete the stats DB
+ HIVE_STATS_RETRIES_WAIT("hive.stats.retries.wait",
+ 3000), // # milliseconds to wait before the next retry
// Concurrency
@@ -428,7 +432,7 @@
// temporary variable for testing. This is added just to turn off this feature in case of a bug in
// deployment. It has not been documented in hive-default.xml intentionally, this should be removed
// once the feature is stable
- HIVE_MAPPER_CANNOT_SPAN_MULTIPLE_PARTITIONS("hive.mapper.cannot.span.multiple.partitions", false),
+ HIVE_MAPPER_CANNOT_SPAN_MULTIPLE_PARTITIONS("hive.mapper.cannot.span.multiple.partitions", false),
;
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (revision 1095959)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (working copy)
@@ -43,6 +43,9 @@
import java.net.URI;
import java.net.URL;
import java.net.URLClassLoader;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
@@ -2002,4 +2005,66 @@
sb.append(">");
_log.info(sb);
}
+
+ public static enum SQLCommandType { QUERY, UPDATE };
+
+ /**
+ * Retry SQL execution with random backoff (same as the one implemented in HDFS-767).
+ */
+ public static ResultSet executeWithRetry(PreparedStatement query, SQLCommandType type,
+ int baseWindow, int maxRetries) throws SQLException {
+
+ Random r = new Random();
+ ResultSet result = null;
+
+ // retry with # of maxRetries before throwing exception
+ int failures = 0;
+ while (true) {
+ try {
+ switch (type) {
+ case QUERY:
+ result = query.executeQuery();
+ return result;
+ case UPDATE:
+ query.executeUpdate();
+ return null;
+ default:
+ assert(false); // should not hit
+ }
+ } catch (SQLException e) {
+ failures++;
+ if (failures > maxRetries) {
+ throw e;
+ }
+ LOG.warn("Failure and retry #" + failures + " with exception " + e.getMessage());
+ long waitTime = getRandomWaitTime(baseWindow, failures, r);
+ try {
+ Thread.sleep(waitTime);
+ } catch (InterruptedException iex) {
+ }
+ }
+ }
+ }
+
+
+ /**
+ * Introducing a random factor to the wait time before another retry.
+ * The wait time is dependent on # of failures and a random factor.
+ * At the first time of getting an exception , the wait time
+ * is a random number between 0..baseWindow msec. If the first retry
+ * still fails, we will wait baseWindow msec grace period before the 2nd retry.
+ * Also at the second retry, the waiting window is expanded to 2*baseWindow msec
+ * alleviating the request rate from the server. Similarly the 3rd retry
+ * will wait 2*baseWindow msec. grace period before retry and the waiting window is
+ * expanded to 3*baseWindow msec and so on.
+ * @param baseWindow the base waiting window.
+ * @param failures number of failures so far.
+ * @param r a random generator.
+ * @return number of milliseconds for the next wait time.
+ */
+ public static long getRandomWaitTime(int baseWindow, int failures, Random r) {
+ return (long) (
+ baseWindow * failures + // grace period for the last round of attempt
+ baseWindow * (failures + 1) * r.nextDouble()); // expanding time window for each failure
+ }
}
Index: ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java (revision 1095959)
+++ ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java (working copy)
@@ -25,11 +25,13 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.stats.StatsPublisher;
import org.apache.hadoop.hive.ql.stats.StatsSetupConst;
@@ -40,56 +42,78 @@
private Configuration hiveconf;
private final Log LOG = LogFactory.getLog(this.getClass().getName());
private PreparedStatement selStmt, updStmt, insStmt;
- private int timeout = 30; // default timeout in sec. for JDBC connection and statements
+ private int timeout; // default timeout in sec. for JDBC connection and statements
// SQL comment that identifies where the SQL statement comes from
private final String comment = "Hive stats publishing: " + this.getClass().getName();
+ private int maxRetries, waitWindow;
+ private final Random r;
-
public JDBCStatsPublisher() {
- selStmt = updStmt = insStmt = null;
+ r = new Random();
}
+ @Override
public boolean connect(Configuration hiveconf) {
+ this.hiveconf = hiveconf;
+ maxRetries = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_RETRIES_MAX);
+ waitWindow = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_RETRIES_WAIT);
+ connectionString = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSDBCONNECTIONSTRING);
+ timeout = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_JDBC_TIMEOUT);
+ String driver = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSJDBCDRIVER);
+
try {
- this.hiveconf = hiveconf;
- connectionString = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSDBCONNECTIONSTRING);
- timeout = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_JDBC_TIMEOUT);
- String driver = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSJDBCDRIVER);
Class.forName(driver).newInstance();
- DriverManager.setLoginTimeout(timeout); // stats is non-blocking
- conn = DriverManager.getConnection(connectionString);
+ } catch (Exception e) {
+ LOG.error("Error during instantiating JDBC driver " + driver + ". ", e);
+ return false;
+ }
- // prepare the SELECT/UPDATE/INSERT statements
- String select =
- "SELECT /* " + comment + " */ " + JDBCStatsSetupConstants.PART_STAT_ROW_COUNT_COLUMN_NAME +
- " FROM " + JDBCStatsSetupConstants.PART_STAT_TABLE_NAME +
- " WHERE " + JDBCStatsSetupConstants.PART_STAT_ID_COLUMN_NAME + " = ?";
+ // prepare the SELECT/UPDATE/INSERT statements
+ String select =
+ "SELECT /* " + comment + " */ " + JDBCStatsSetupConstants.PART_STAT_ROW_COUNT_COLUMN_NAME +
+ " FROM " + JDBCStatsSetupConstants.PART_STAT_TABLE_NAME +
+ " WHERE " + JDBCStatsSetupConstants.PART_STAT_ID_COLUMN_NAME + " = ?";
- String update =
- "UPDATE /* " + comment + " */ "+ JDBCStatsSetupConstants.PART_STAT_TABLE_NAME +
- " SET " + JDBCStatsSetupConstants.PART_STAT_ROW_COUNT_COLUMN_NAME + "= ? " +
- " WHERE " + JDBCStatsSetupConstants.PART_STAT_ID_COLUMN_NAME + " = ?";
+ String update =
+ "UPDATE /* " + comment + " */ "+ JDBCStatsSetupConstants.PART_STAT_TABLE_NAME +
+ " SET " + JDBCStatsSetupConstants.PART_STAT_ROW_COUNT_COLUMN_NAME + "= ? " +
+ " WHERE " + JDBCStatsSetupConstants.PART_STAT_ID_COLUMN_NAME + " = ?";
- String insert =
- "INSERT INTO /* " + comment + " */ " + JDBCStatsSetupConstants.PART_STAT_TABLE_NAME +
- " VALUES (?, ?)";
+ String insert =
+ "INSERT INTO /* " + comment + " */ " + JDBCStatsSetupConstants.PART_STAT_TABLE_NAME +
+ " VALUES (?, ?)";
- selStmt = conn.prepareStatement(select);
- updStmt = conn.prepareStatement(update);
- insStmt = conn.prepareStatement(insert);
+ DriverManager.setLoginTimeout(timeout); // stats is non-blocking
- // make the statements non-blocking
- selStmt.setQueryTimeout(timeout);
- updStmt.setQueryTimeout(timeout);
- insStmt.setQueryTimeout(timeout);
+ int failures = 0;
+ while (true) {
+ try {
+ conn = DriverManager.getConnection(connectionString);
+ selStmt = conn.prepareStatement(select);
+ updStmt = conn.prepareStatement(update);
+ insStmt = conn.prepareStatement(insert);
- return true;
- } catch (Exception e) {
- LOG.error("Error during JDBC connection to " + connectionString + ". ", e);
- return false;
+ // make the statements non-blocking
+ selStmt.setQueryTimeout(timeout);
+ updStmt.setQueryTimeout(timeout);
+ insStmt.setQueryTimeout(timeout);
+ return true;
+ } catch (SQLException e) {
+ failures++;
+ if (failures > maxRetries) {
+ LOG.error("Error during JDBC connection to " + connectionString + ". ", e);
+ return false; // just return false without fail the task
+ }
+ long waitTime = Utilities.getRandomWaitTime(waitWindow, failures, r);
+ try {
+ Thread.sleep(waitTime);
+ } catch (InterruptedException e1) {
+ }
+ }
}
}
+ @Override
public boolean publishStat(String fileID, String statType, String value) {
if (conn == null) {
@@ -108,7 +132,8 @@
// Check to see if a previous task (mapper attempt) had published a previous stat
selStmt.setString(1, fileID);
- ResultSet result = selStmt.executeQuery();
+ ResultSet result = Utilities.executeWithRetry(selStmt, Utilities.SQLCommandType.QUERY,
+ waitWindow, maxRetries);
if (result.next()) {
long currval = result.getLong(1);
@@ -117,13 +142,15 @@
if (currval < Long.parseLong(value)) {
updStmt.setString(1, value);
updStmt.setString(2, fileID);
- updStmt.executeUpdate();
+ Utilities.executeWithRetry(updStmt, Utilities.SQLCommandType.UPDATE,
+ waitWindow, maxRetries);
}
} else {
// No previous attempts.
insStmt.setString(1, fileID);
insStmt.setString(2, value);
- insStmt.executeUpdate();
+ Utilities.executeWithRetry(insStmt, Utilities.SQLCommandType.UPDATE,
+ waitWindow, maxRetries);
}
return true;
} catch (SQLException e) {
@@ -132,6 +159,7 @@
}
}
+ @Override
public boolean closeConnection() {
if (conn == null) {
return true;
@@ -167,6 +195,11 @@
}
}
+ /**
+ * Initialize the intermediate stats DB for the first time it is running (e.g.,
+ * creating tables.).
+ */
+ @Override
public boolean init(Configuration hconf) {
try {
this.hiveconf = hconf;
Index: ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java (revision 1095959)
+++ ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java (working copy)
@@ -20,14 +20,17 @@
import java.sql.Connection;
import java.sql.DriverManager;
+import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.stats.StatsAggregator;
import org.apache.hadoop.hive.ql.stats.StatsSetupConst;
@@ -36,25 +39,80 @@
private Connection conn;
private String connectionString;
private Configuration hiveconf;
+ private PreparedStatement selStmt, delStmt;
private final Log LOG = LogFactory.getLog(this.getClass().getName());
private int timeout = 30;
private final String comment = "Hive stats aggregation: " + this.getClass().getName();
+ private int maxRetries, waitWindow;
+ private final Random r;
+ public JDBCStatsAggregator() {
+ r = new Random();
+ }
+
+ @Override
public boolean connect(Configuration hiveconf) {
+ this.hiveconf = hiveconf;
+ timeout = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_JDBC_TIMEOUT);
+ connectionString = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSDBCONNECTIONSTRING);
+ String driver = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSJDBCDRIVER);
+ maxRetries = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_RETRIES_MAX);
+ waitWindow = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_RETRIES_WAIT);
+
try {
- this.hiveconf = hiveconf;
- timeout = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_JDBC_TIMEOUT);
- connectionString = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSDBCONNECTIONSTRING);
- String driver = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSJDBCDRIVER);
Class.forName(driver).newInstance();
- // stats is non-blocking -- throw an exception when timeout
- DriverManager.setLoginTimeout(timeout);
- conn = DriverManager.getConnection(connectionString);
- return true;
} catch (Exception e) {
- LOG.error("Error during JDBC connection. " + e);
+ LOG.error("Error during instantiating JDBC driver " + driver + ". ", e);
return false;
}
+
+ // prepare the SELECT/DELETE statements
+ String select =
+ "SELECT /* " + comment + " */ " +
+ " SUM(" + JDBCStatsSetupConstants.PART_STAT_ROW_COUNT_COLUMN_NAME + ")" +
+ " FROM " + JDBCStatsSetupConstants.PART_STAT_TABLE_NAME +
+ " WHERE " + JDBCStatsSetupConstants.PART_STAT_ID_COLUMN_NAME + " LIKE ?";
+
+ /* Automatic Cleaning:
+ IMPORTANT: Since we publish and aggregate only 1 value (1 column) which is the row count, it
+ is valid to delete the row after aggregation (automatic cleaning) because we know that there is no
+ other values to aggregate.
+ If ;in the future; other values are aggregated and published, then we cannot do cleaning except
+ when we are sure that all values are aggregated, or we can separate the implementation of cleaning
+ through a separate method which the developer has to call it manually in the code.
+ */
+ String delete =
+ "DELETE /* " + comment + " */ " +
+ " FROM " + JDBCStatsSetupConstants.PART_STAT_TABLE_NAME +
+ " WHERE " + JDBCStatsSetupConstants.PART_STAT_ID_COLUMN_NAME + " LIKE ?";
+
+ // stats is non-blocking -- throw an exception when timeout
+ DriverManager.setLoginTimeout(timeout);
+
+ int failures = 0;
+ while (true) {
+ try {
+ conn = DriverManager.getConnection(connectionString);
+ selStmt = conn.prepareStatement(select);
+ delStmt = conn.prepareStatement(delete);
+
+ // make the statements non-blocking
+ selStmt.setQueryTimeout(timeout);
+ delStmt.setQueryTimeout(timeout);
+ return true;
+ } catch (Exception e) {
+ failures++;
+ if (failures > maxRetries) {
+ LOG.error("Error during JDBC connection. " + e);
+ return false;
+ }
+ long waitTime = Utilities.getRandomWaitTime(waitWindow, failures, r);
+ try {
+ Thread.sleep(waitTime);
+ } catch (InterruptedException e1) {
+ }
+ }
+ }
}
@Override
@@ -70,37 +128,29 @@
try {
long retval = 0;
- Statement stmt = conn.createStatement();
- stmt.setQueryTimeout(timeout);
- String select =
- "SELECT /* " + comment + " */ " +
- " SUM(" + JDBCStatsSetupConstants.PART_STAT_ROW_COUNT_COLUMN_NAME + ")" +
- " FROM " + JDBCStatsSetupConstants.PART_STAT_TABLE_NAME +
- " WHERE " + JDBCStatsSetupConstants.PART_STAT_ID_COLUMN_NAME + " LIKE '" + fileID + "%'";
- ResultSet result = stmt.executeQuery(select);
+ String keyPrefix = fileID + "%";
+ selStmt.setString(1, keyPrefix);
+ ResultSet result = Utilities.executeWithRetry(selStmt, Utilities.SQLCommandType.QUERY,
+ waitWindow, maxRetries);
if (result.next()) {
retval = result.getLong(1);
} else {
LOG.warn("Warning. Nothing published. Nothing aggregated.");
return "";
}
- stmt.clearBatch();
/* Automatic Cleaning:
IMPORTANT: Since we publish and aggregate only 1 value (1 column) which is the row count, it
is valid to delete the row after aggregation (automatic cleaning) because we know that there is no
other values to aggregate.
- If ;in the future; other values are aggregated and published, then we cannot do cleaning except
+ If in the future; other values are aggregated and published, then we cannot do cleaning except
when we are sure that all values are aggregated, or we can separate the implementation of cleaning
through a separate method which the developer has to call it manually in the code.
*/
- String delete =
- "DELETE /* " + comment + " */ " +
- " FROM " + JDBCStatsSetupConstants.PART_STAT_TABLE_NAME +
- " WHERE " + JDBCStatsSetupConstants.PART_STAT_ID_COLUMN_NAME + " LIKE '" + fileID + "%'";
- stmt.executeUpdate(delete);
- stmt.close();
+ delStmt.setString(1, keyPrefix);
+ Utilities.executeWithRetry(delStmt, Utilities.SQLCommandType.UPDATE,
+ waitWindow, maxRetries);
LOG.info("Stats aggregator got " + retval);
@@ -111,6 +161,7 @@
}
}
+ @Override
public boolean closeConnection() {
if (conn == null) {
@@ -136,6 +187,7 @@
}
}
+ @Override
public boolean cleanUp(String rowID) {
try {
Statement stmt = conn.createStatement();