Index: conf/hive-default.xml
===================================================================
--- conf/hive-default.xml (revision 1096632)
+++ conf/hive-default.xml (working copy)
@@ -792,6 +792,18 @@
+ hive.stats.retries.max
+ 0
+ Maximum number of retries when stats publisher/aggregator got an exception updating intermediate database. Default is no tries on failures.
+
+
+
+ hive.stats.retries.wait
+ 3000
+ The base waiting window (in milliseconds) before the next retry. The actual wait time is calculated by baseWindow * failues + baseWindow * (failure + 1) * (random number between [0.0,1.0]).
+
+
+
hive.support.concurrency
false
Whether hive supports concurrency or not. A zookeeper instance must be up and running for the default hive lock manager to support read-write locks.
Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
===================================================================
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1096632)
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy)
@@ -379,6 +379,10 @@
30), // default timeout in sec for JDBC connection & SQL statements
HIVE_STATS_ATOMIC("hive.stats.atomic",
false), // whether to update metastore stats only if all stats are available
+ HIVE_STATS_RETRIES_MAX("hive.stats.retries.max",
+ 0), // maximum # of retries to insert/select/delete the stats DB
+ HIVE_STATS_RETRIES_WAIT("hive.stats.retries.wait",
+ 3000), // # milliseconds to wait before the next retry
// Concurrency
@@ -429,7 +433,7 @@
// temporary variable for testing. This is added just to turn off this feature in case of a bug in
// deployment. It has not been documented in hive-default.xml intentionally, this should be removed
// once the feature is stable
- HIVE_MAPPER_CANNOT_SPAN_MULTIPLE_PARTITIONS("hive.mapper.cannot.span.multiple.partitions", false),
+ HIVE_MAPPER_CANNOT_SPAN_MULTIPLE_PARTITIONS("hive.mapper.cannot.span.multiple.partitions", false),
;
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (revision 1096632)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (working copy)
@@ -43,6 +43,11 @@
import java.net.URI;
import java.net.URL;
import java.net.URLClassLoader;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.SQLTransientException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
@@ -2002,4 +2007,156 @@
sb.append(">");
_log.info(sb);
}
+
+ public static class SQLCommand {
+ public T run(PreparedStatement stmt) throws SQLException {
+ return null;
+ }
+ }
+
+ /**
+ * Retry SQL execution with random backoff (same as the one implemented in HDFS-767).
+ * This function only retries when the SQL query throws a SQLTransientException (which
+ * might be able to succeed with a simple retry). It doesn't retry when the exception
+ * is a SQLRecoverableException or SQLNonTransientException. For SQLRecoverableException
+ * the caller needs to reconnect to the database and restart the whole transaction.
+ *
+ * @param query the prepared statement of SQL.
+ * @param type either SQLCommandType.QUERY or SQLCommandType.UPDATE
+ * @param baseWindow The base time window (in milliseconds) before the next retry.
+ * see {@getRandomWaitTime} for details.
+ * @param maxRetries the maximum # of retries when getting a SQLTransientException.
+ * @throws SQLException throws SQLRecoverableException or SQLNonTransientException the
+ * first time it is caught, or SQLTransientException when the maxRetries has reached.
+ */
+ public static T executeWithRetry(SQLCommand cmd, PreparedStatement stmt,
+ int baseWindow, int maxRetries) throws SQLException {
+
+ Random r = new Random();
+ T result = null;
+
+ // retry with # of maxRetries before throwing exception
+ for (int failures = 0; ; failures++) {
+ try {
+ result = cmd.run(stmt);
+ return result;
+ } catch (SQLTransientException e) {
+ LOG.warn("Failure and retry #" + failures + " with exception " + e.getMessage());
+ if (failures >= maxRetries) {
+ throw e;
+ }
+ long waitTime = getRandomWaitTime(baseWindow, failures, r);
+ try {
+ Thread.sleep(waitTime);
+ } catch (InterruptedException iex) {
+ }
+ } catch (SQLException e) {
+ // throw other types of SQLExceptions (SQLNonTransientException / SQLRecoverableException)
+ throw e;
+ }
+ }
+ }
+
+ /**
+ * Retry connecting to a database with random backoff (same as the one implemented in HDFS-767).
+ * This function only retries when the SQL query throws a SQLTransientException (which
+ * might be able to succeed with a simple retry). It doesn't retry when the exception
+ * is a SQLRecoverableException or SQLNonTransientException. For SQLRecoverableException
+ * the caller needs to reconnect to the database and restart the whole transaction.
+ *
+ * @param connectionString the JDBC connection string.
+ * @param baseWindow The base time window (in milliseconds) before the next retry.
+ * see {@getRandomWaitTime} for details.
+ * @param maxRetries the maximum # of retries when getting a SQLTransientException.
+ * @throws SQLException throws SQLRecoverableException or SQLNonTransientException the
+ * first time it is caught, or SQLTransientException when the maxRetries has reached.
+ */
+ public static Connection connectWithRetry(String connectionString,
+ int waitWindow, int maxRetries) throws SQLException {
+
+ Random r = new Random();
+
+ // retry with # of maxRetries before throwing exception
+ for (int failures = 0; ; failures++) {
+ try {
+ Connection conn = DriverManager.getConnection(connectionString);
+ return conn;
+ } catch (SQLTransientException e) {
+ if (failures >= maxRetries) {
+ LOG.error("Error during JDBC connection. " + e);
+ throw e;
+ }
+ long waitTime = Utilities.getRandomWaitTime(waitWindow, failures, r);
+ try {
+ Thread.sleep(waitTime);
+ } catch (InterruptedException e1) {
+ }
+ } catch (SQLException e) {
+ // just throw other types (SQLNonTransientException / SQLRecoverableException)
+ throw e;
+ }
+ }
+ }
+
+ /**
+ * Retry preparing a SQL statement with random backoff (same as the one implemented in HDFS-767).
+ * This function only retries when the SQL query throws a SQLTransientException (which
+ * might be able to succeed with a simple retry). It doesn't retry when the exception
+ * is a SQLRecoverableException or SQLNonTransientException. For SQLRecoverableException
+ * the caller needs to reconnect to the database and restart the whole transaction.
+ *
+ * @param conn a JDBC connection.
+ * @param stmt the SQL statement to be prepared.
+ * @param baseWindow The base time window (in milliseconds) before the next retry.
+ * see {@getRandomWaitTime} for details.
+ * @param maxRetries the maximum # of retries when getting a SQLTransientException.
+ * @throws SQLException throws SQLRecoverableException or SQLNonTransientException the
+ * first time it is caught, or SQLTransientException when the maxRetries has reached.
+ */
+ public static PreparedStatement prepareWithRetry(Connection conn, String stmt,
+ int waitWindow, int maxRetries) throws SQLException {
+
+ Random r = new Random();
+
+ // retry with # of maxRetries before throwing exception
+ for (int failures = 0; ; failures++) {
+ try {
+ return conn.prepareStatement(stmt);
+ } catch (SQLTransientException e) {
+ if (failures >= maxRetries) {
+ LOG.error("Error preparing JDBC Statement " + stmt + " :" + e);
+ throw e;
+ }
+ long waitTime = Utilities.getRandomWaitTime(waitWindow, failures, r);
+ try {
+ Thread.sleep(waitTime);
+ } catch (InterruptedException e1) {
+ }
+ } catch (SQLException e) {
+ // just throw other types (SQLNonTransientException / SQLRecoverableException)
+ throw e;
+ }
+ }
+ }
+
+ /**
+ * Introducing a random factor to the wait time before another retry.
+ * The wait time is dependent on # of failures and a random factor.
+ * At the first time of getting an exception , the wait time
+ * is a random number between 0..baseWindow msec. If the first retry
+ * still fails, we will wait baseWindow msec grace period before the 2nd retry.
+ * Also at the second retry, the waiting window is expanded to 2*baseWindow msec
+ * alleviating the request rate from the server. Similarly the 3rd retry
+ * will wait 2*baseWindow msec. grace period before retry and the waiting window is
+ * expanded to 3*baseWindow msec and so on.
+ * @param baseWindow the base waiting window.
+ * @param failures number of failures so far.
+ * @param r a random generator.
+ * @return number of milliseconds for the next wait time.
+ */
+ public static long getRandomWaitTime(int baseWindow, int failures, Random r) {
+ return (long) (
+ baseWindow * failures + // grace period for the last round of attempt
+ baseWindow * (failures + 1) * r.nextDouble()); // expanding time window for each failure
+ }
}
Index: ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java (revision 1096632)
+++ ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java (working copy)
@@ -24,12 +24,15 @@
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.SQLRecoverableException;
import java.sql.Statement;
+import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.stats.StatsPublisher;
import org.apache.hadoop.hive.ql.stats.StatsSetupConst;
@@ -40,56 +43,92 @@
private Configuration hiveconf;
private final Log LOG = LogFactory.getLog(this.getClass().getName());
private PreparedStatement selStmt, updStmt, insStmt;
- private int timeout = 30; // default timeout in sec. for JDBC connection and statements
+ private int timeout; // default timeout in sec. for JDBC connection and statements
// SQL comment that identifies where the SQL statement comes from
private final String comment = "Hive stats publishing: " + this.getClass().getName();
+ private int maxRetries, waitWindow;
+ private final Random r;
-
public JDBCStatsPublisher() {
- selStmt = updStmt = insStmt = null;
+ r = new Random();
}
+ @Override
public boolean connect(Configuration hiveconf) {
+ this.hiveconf = hiveconf;
+ maxRetries = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_RETRIES_MAX);
+ waitWindow = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_RETRIES_WAIT);
+ connectionString = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSDBCONNECTIONSTRING);
+ timeout = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_JDBC_TIMEOUT);
+ String driver = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSJDBCDRIVER);
+
try {
- this.hiveconf = hiveconf;
- connectionString = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSDBCONNECTIONSTRING);
- timeout = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_JDBC_TIMEOUT);
- String driver = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSJDBCDRIVER);
Class.forName(driver).newInstance();
- DriverManager.setLoginTimeout(timeout); // stats is non-blocking
- conn = DriverManager.getConnection(connectionString);
+ } catch (Exception e) {
+ LOG.error("Error during instantiating JDBC driver " + driver + ". ", e);
+ return false;
+ }
- // prepare the SELECT/UPDATE/INSERT statements
- String select =
- "SELECT /* " + comment + " */ " + JDBCStatsSetupConstants.PART_STAT_ROW_COUNT_COLUMN_NAME +
- " FROM " + JDBCStatsSetupConstants.PART_STAT_TABLE_NAME +
- " WHERE " + JDBCStatsSetupConstants.PART_STAT_ID_COLUMN_NAME + " = ?";
+ // prepare the SELECT/UPDATE/INSERT statements
+ String select =
+ "SELECT /* " + comment + " */ " + JDBCStatsSetupConstants.PART_STAT_ROW_COUNT_COLUMN_NAME +
+ " FROM " + JDBCStatsSetupConstants.PART_STAT_TABLE_NAME +
+ " WHERE " + JDBCStatsSetupConstants.PART_STAT_ID_COLUMN_NAME + " = ?";
- String update =
- "UPDATE /* " + comment + " */ "+ JDBCStatsSetupConstants.PART_STAT_TABLE_NAME +
- " SET " + JDBCStatsSetupConstants.PART_STAT_ROW_COUNT_COLUMN_NAME + "= ? " +
- " WHERE " + JDBCStatsSetupConstants.PART_STAT_ID_COLUMN_NAME + " = ?";
+ String update =
+ "UPDATE /* " + comment + " */ "+ JDBCStatsSetupConstants.PART_STAT_TABLE_NAME +
+ " SET " + JDBCStatsSetupConstants.PART_STAT_ROW_COUNT_COLUMN_NAME + "= ? " +
+ " WHERE " + JDBCStatsSetupConstants.PART_STAT_ID_COLUMN_NAME + " = ?";
- String insert =
- "INSERT INTO /* " + comment + " */ " + JDBCStatsSetupConstants.PART_STAT_TABLE_NAME +
- " VALUES (?, ?)";
+ String insert =
+ "INSERT INTO /* " + comment + " */ " + JDBCStatsSetupConstants.PART_STAT_TABLE_NAME +
+ " VALUES (?, ?)";
- selStmt = conn.prepareStatement(select);
- updStmt = conn.prepareStatement(update);
- insStmt = conn.prepareStatement(insert);
+ DriverManager.setLoginTimeout(timeout); // stats is non-blocking
- // make the statements non-blocking
- selStmt.setQueryTimeout(timeout);
- updStmt.setQueryTimeout(timeout);
- insStmt.setQueryTimeout(timeout);
+ // function pointer for executeWithRetry to setQueryTimeout
+ Utilities.SQLCommand setQueryTimeout = new Utilities.SQLCommand() {
+ @Override
+ public Void run(PreparedStatement stmt) throws SQLException {
+ stmt.setQueryTimeout(timeout);
+ return null;
+ }
+ };
- return true;
- } catch (Exception e) {
- LOG.error("Error during JDBC connection to " + connectionString + ". ", e);
- return false;
+ for (int failures = 0; ; failures++) {
+ try {
+ conn = Utilities.connectWithRetry(connectionString, waitWindow, maxRetries);
+
+ // prepare statements
+ selStmt = Utilities.prepareWithRetry(conn, select, waitWindow, maxRetries);
+ updStmt = Utilities.prepareWithRetry(conn, update, waitWindow, maxRetries);
+ insStmt = Utilities.prepareWithRetry(conn, insert, waitWindow, maxRetries);
+
+ // set query timeout
+ Utilities.executeWithRetry(setQueryTimeout, selStmt, waitWindow, maxRetries);
+ Utilities.executeWithRetry(setQueryTimeout, updStmt, waitWindow, maxRetries);
+ Utilities.executeWithRetry(setQueryTimeout, insStmt, waitWindow, maxRetries);
+
+ return true;
+ } catch (SQLRecoverableException e) {
+ if (failures >= maxRetries) {
+ LOG.error("Error during JDBC connection to " + connectionString + ". ", e);
+ return false; // just return false without fail the task
+ }
+ long waitTime = Utilities.getRandomWaitTime(waitWindow, failures, r);
+ try {
+ Thread.sleep(waitTime);
+ } catch (InterruptedException e1) {
+ }
+ } catch (SQLException e) {
+ // for SQLTransientException (maxRetries already achieved at Utilities retry functions
+ // or SQLNonTransientException, declare a real failure
+ return false;
+ }
}
}
+ @Override
public boolean publishStat(String fileID, String statType, String value) {
if (conn == null) {
@@ -104,34 +143,70 @@
}
LOG.info("Stats publishing for key " + fileID + ". Value = " + value);
- try {
+ Utilities.SQLCommand execQuery = new Utilities.SQLCommand() {
+ @Override
+ public ResultSet run(PreparedStatement stmt) throws SQLException {
+ return stmt.executeQuery();
+ }
+ };
- // Check to see if a previous task (mapper attempt) had published a previous stat
- selStmt.setString(1, fileID);
- ResultSet result = selStmt.executeQuery();
+ Utilities.SQLCommand execUpdate = new Utilities.SQLCommand() {
+ @Override
+ public Void run(PreparedStatement stmt) throws SQLException {
+ stmt.executeUpdate();
+ return null;
+ }
+ };
- if (result.next()) {
- long currval = result.getLong(1);
- // Only update if the previous value is smaller (i.e. the previous attempt was a fail and
- // hopefully this attempt is a success (as it has a greater value).
- if (currval < Long.parseLong(value)) {
- updStmt.setString(1, value);
- updStmt.setString(2, fileID);
- updStmt.executeUpdate();
+ for (int failures = 0; ; failures++) {
+ try {
+
+ // Check to see if a previous task (mapper attempt) had published a previous stat
+ selStmt.setString(1, fileID);
+ ResultSet result = Utilities.executeWithRetry(execQuery, selStmt, waitWindow, maxRetries);
+
+ if (result.next()) {
+ long currval = result.getLong(1);
+ // Only update if the previous value is smaller (i.e. the previous attempt was a fail and
+ // hopefully this attempt is a success (as it has a greater value).
+ if (currval < Long.parseLong(value)) {
+ updStmt.setString(1, value);
+ updStmt.setString(2, fileID);
+ Utilities.executeWithRetry(execUpdate, updStmt, waitWindow, maxRetries);
+ }
+ } else {
+ // No previous attempts.
+ insStmt.setString(1, fileID);
+ insStmt.setString(2, value);
+ Utilities.executeWithRetry(execUpdate, insStmt, waitWindow, maxRetries);
}
- } else {
- // No previous attempts.
- insStmt.setString(1, fileID);
- insStmt.setString(2, value);
- insStmt.executeUpdate();
+ return true;
+ } catch (SQLRecoverableException e) {
+ // need to start from scratch (connection)
+ if (failures >= maxRetries) {
+ return false;
+ }
+ // close the current connection
+ closeConnection();
+ long waitTime = Utilities.getRandomWaitTime(waitWindow, failures, r);
+ try {
+ Thread.sleep(waitTime);
+ } catch (InterruptedException iex) {
+ }
+ // get a new connection
+ if (!connect(hiveconf)) {
+ // if cannot reconnect, just fail because connect() already handles retries.
+ LOG.error("Error during publishing aggregation. " + e);
+ return false;
+ }
+ } catch (SQLException e) {
+ LOG.error("Error during publishing statistics. ", e);
+ return false;
}
- return true;
- } catch (SQLException e) {
- LOG.error("Error during publishing statistics. ", e);
- return false;
}
}
+ @Override
public boolean closeConnection() {
if (conn == null) {
return true;
@@ -167,6 +242,11 @@
}
}
+ /**
+ * Initialize the intermediate stats DB for the first time it is running (e.g.,
+ * creating tables.).
+ */
+ @Override
public boolean init(Configuration hconf) {
try {
this.hiveconf = hconf;
Index: ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java (revision 1096632)
+++ ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java (working copy)
@@ -20,14 +20,18 @@
import java.sql.Connection;
import java.sql.DriverManager;
+import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.SQLRecoverableException;
import java.sql.Statement;
+import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.stats.StatsAggregator;
import org.apache.hadoop.hive.ql.stats.StatsSetupConst;
@@ -36,25 +40,94 @@
private Connection conn;
private String connectionString;
private Configuration hiveconf;
+ private PreparedStatement selStmt, delStmt;
private final Log LOG = LogFactory.getLog(this.getClass().getName());
private int timeout = 30;
private final String comment = "Hive stats aggregation: " + this.getClass().getName();
+ private int maxRetries, waitWindow;
+ private final Random r;
+ public JDBCStatsAggregator() {
+ r = new Random();
+ }
+
+ @Override
public boolean connect(Configuration hiveconf) {
+ this.hiveconf = hiveconf;
+ timeout = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_JDBC_TIMEOUT);
+ connectionString = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSDBCONNECTIONSTRING);
+ String driver = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSJDBCDRIVER);
+ maxRetries = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_RETRIES_MAX);
+ waitWindow = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_RETRIES_WAIT);
+
try {
- this.hiveconf = hiveconf;
- timeout = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_JDBC_TIMEOUT);
- connectionString = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSDBCONNECTIONSTRING);
- String driver = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSJDBCDRIVER);
Class.forName(driver).newInstance();
- // stats is non-blocking -- throw an exception when timeout
- DriverManager.setLoginTimeout(timeout);
- conn = DriverManager.getConnection(connectionString);
- return true;
} catch (Exception e) {
- LOG.error("Error during JDBC connection. " + e);
+ LOG.error("Error during instantiating JDBC driver " + driver + ". ", e);
return false;
}
+
+ // prepare the SELECT/DELETE statements
+ String select =
+ "SELECT /* " + comment + " */ " +
+ " SUM(" + JDBCStatsSetupConstants.PART_STAT_ROW_COUNT_COLUMN_NAME + ")" +
+ " FROM " + JDBCStatsSetupConstants.PART_STAT_TABLE_NAME +
+ " WHERE " + JDBCStatsSetupConstants.PART_STAT_ID_COLUMN_NAME + " LIKE ?";
+
+ /* Automatic Cleaning:
+ IMPORTANT: Since we publish and aggregate only 1 value (1 column) which is the row count, it
+ is valid to delete the row after aggregation (automatic cleaning) because we know that there is no
+ other values to aggregate.
+ If ;in the future; other values are aggregated and published, then we cannot do cleaning except
+ when we are sure that all values are aggregated, or we can separate the implementation of cleaning
+ through a separate method which the developer has to call it manually in the code.
+ */
+ String delete =
+ "DELETE /* " + comment + " */ " +
+ " FROM " + JDBCStatsSetupConstants.PART_STAT_TABLE_NAME +
+ " WHERE " + JDBCStatsSetupConstants.PART_STAT_ID_COLUMN_NAME + " LIKE ?";
+
+ // stats is non-blocking -- throw an exception when timeout
+ DriverManager.setLoginTimeout(timeout);
+ // function pointer for executeWithRetry to setQueryTimeout
+ Utilities.SQLCommand setQueryTimeout = new Utilities.SQLCommand() {
+ @Override
+ public Void run(PreparedStatement stmt) throws SQLException {
+ stmt.setQueryTimeout(timeout);
+ return null;
+ }
+ };
+
+ // retry connection and statement preparations
+ for (int failures = 0; ;failures++) {
+ try {
+ conn = Utilities.connectWithRetry(connectionString, waitWindow, maxRetries);
+
+ // prepare statements
+ selStmt = Utilities.prepareWithRetry(conn, select, waitWindow, maxRetries);
+ delStmt = Utilities.prepareWithRetry(conn, delete, waitWindow, maxRetries);
+
+ // set query timeout
+ Utilities.executeWithRetry(setQueryTimeout, selStmt, waitWindow, failures);
+ Utilities.executeWithRetry(setQueryTimeout, delStmt, waitWindow, failures);
+
+ return true;
+ } catch (SQLRecoverableException e) {
+ if (failures > maxRetries) {
+ LOG.error("Error during JDBC connection and preparing statement: " + e);
+ return false;
+ }
+ long waitTime = Utilities.getRandomWaitTime(waitWindow, failures, r);
+ try {
+ Thread.sleep(waitTime);
+ } catch (InterruptedException e1) {
+ }
+ } catch (SQLException e) {
+ // for SQLTransientException (maxRetries already achieved at Utilities retry functions
+ // or SQLNonTransientException, declare a real failure
+ return false;
+ }
+ }
}
@Override
@@ -68,49 +141,77 @@
return null;
}
- try {
- long retval = 0;
- Statement stmt = conn.createStatement();
- stmt.setQueryTimeout(timeout);
- String select =
- "SELECT /* " + comment + " */ " +
- " SUM(" + JDBCStatsSetupConstants.PART_STAT_ROW_COUNT_COLUMN_NAME + ")" +
- " FROM " + JDBCStatsSetupConstants.PART_STAT_TABLE_NAME +
- " WHERE " + JDBCStatsSetupConstants.PART_STAT_ID_COLUMN_NAME + " LIKE '" + fileID + "%'";
+ Utilities.SQLCommand execQuery = new Utilities.SQLCommand() {
+ @Override
+ public ResultSet run(PreparedStatement stmt) throws SQLException {
+ return stmt.executeQuery();
+ }
+ };
- ResultSet result = stmt.executeQuery(select);
- if (result.next()) {
- retval = result.getLong(1);
- } else {
- LOG.warn("Warning. Nothing published. Nothing aggregated.");
- return "";
+ Utilities.SQLCommand execUpdate = new Utilities.SQLCommand() {
+ @Override
+ public Void run(PreparedStatement stmt) throws SQLException {
+ stmt.executeUpdate();
+ return null;
}
- stmt.clearBatch();
+ };
- /* Automatic Cleaning:
- IMPORTANT: Since we publish and aggregate only 1 value (1 column) which is the row count, it
- is valid to delete the row after aggregation (automatic cleaning) because we know that there is no
- other values to aggregate.
- If ;in the future; other values are aggregated and published, then we cannot do cleaning except
- when we are sure that all values are aggregated, or we can separate the implementation of cleaning
- through a separate method which the developer has to call it manually in the code.
- */
- String delete =
- "DELETE /* " + comment + " */ " +
- " FROM " + JDBCStatsSetupConstants.PART_STAT_TABLE_NAME +
- " WHERE " + JDBCStatsSetupConstants.PART_STAT_ID_COLUMN_NAME + " LIKE '" + fileID + "%'";
- stmt.executeUpdate(delete);
- stmt.close();
+ for (int failures = 0; ; failures++) {
+ try {
+ long retval = 0;
- LOG.info("Stats aggregator got " + retval);
+ String keyPrefix = fileID + "%";
+ selStmt.setString(1, keyPrefix);
+ ResultSet result = Utilities.executeWithRetry(execQuery, selStmt, waitWindow, maxRetries);
+ if (result.next()) {
+ retval = result.getLong(1);
+ } else {
+ LOG.warn("Warning. Nothing published. Nothing aggregated.");
+ return "";
+ }
- return Long.toString(retval);
- } catch (SQLException e) {
- LOG.error("Error during publishing aggregation. " + e);
- return null;
+ /* Automatic Cleaning:
+ IMPORTANT: Since we publish and aggregate only 1 value (1 column) which is the row count, it
+ is valid to delete the row after aggregation (automatic cleaning) because we know that there is no
+ other values to aggregate.
+ If in the future; other values are aggregated and published, then we cannot do cleaning except
+ when we are sure that all values are aggregated, or we can separate the implementation of cleaning
+ through a separate method which the developer has to call it manually in the code.
+ */
+ delStmt.setString(1, keyPrefix);
+ Utilities.executeWithRetry(execUpdate, delStmt, waitWindow, maxRetries);
+
+ LOG.info("Stats aggregator got " + retval);
+
+ return Long.toString(retval);
+ } catch (SQLRecoverableException e) {
+ // need to start from scratch (connection)
+ if (failures >= maxRetries) {
+ return null;
+ }
+ // close the current connection
+ closeConnection();
+ long waitTime = Utilities.getRandomWaitTime(waitWindow, failures, r);
+ try {
+ Thread.sleep(waitTime);
+ } catch (InterruptedException iex) {
+ }
+ // getting a new connection
+ if (!connect(hiveconf)) {
+ // if cannot reconnect, just fail because connect() already handles retries.
+ LOG.error("Error during publishing aggregation. " + e);
+ return null;
+ }
+ } catch (SQLException e) {
+ // for SQLTransientException (already handled by Utilities.*WithRetries() functions
+ // and SQLNonTransientException, just declare failure.
+ LOG.error("Error during publishing aggregation. " + e);
+ return null;
+ }
}
}
+ @Override
public boolean closeConnection() {
if (conn == null) {
@@ -136,6 +237,7 @@
}
}
+ @Override
public boolean cleanUp(String rowID) {
try {
Statement stmt = conn.createStatement();