Index: conf/hive-default.xml
===================================================================
--- conf/hive-default.xml (revision 1067256)
+++ conf/hive-default.xml (working copy)
@@ -734,6 +734,30 @@
+ hive.stats.default.publisher
+
+ The Java class (implementing the StatsPublisher interface) that is used by default if hive.stats.dbclass is not JDBC or HBase.
+
+
+
+ hive.stats.default.aggregator
+
+ The Java class (implementing the StatsAggregator interface) that is used by default if hive.stats.dbclass is not JDBC or HBase.
+
+
+
+ hive.stats.jdbc.timeout
+ 30
+ Timeout value (number of seconds) used by JDBC connection and statements.
+
+
+
+ hive.stats.jdbc.atomic
+ false
+ If this is set to true then the metastore stats will be updated only if all types of stats (# of rows, # of files, # of bytes etc.) are available. Otherwise metastore stats are updated in a best effort fashion with whatever are available.
+
+
+
hive.support.concurrency
false
Whether hive supports concurrency or not. A zookeeper instance must be up and running for the default hive lock manager to support read-write locks.
Index: build-common.xml
===================================================================
--- build-common.xml (revision 1067256)
+++ build-common.xml (working copy)
@@ -54,7 +54,7 @@
-
+
Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
===================================================================
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1067256)
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy)
@@ -325,6 +325,10 @@
""), // default stats publisher if none of JDBC/HBase is specified
HIVE_STATS_DEFAULT_AGGREGATOR("hive.stats.default.aggregator",
""), // default stats aggregator if none of JDBC/HBase is specified
+ HIVE_STATS_JDBC_TIMEOUT("hive.stats.jdbc.timeout",
+ 30), // default timeout in sec for JDBC connection & SQL statements
+ HIVE_STATS_ATOMIC("hive.stats.atomic",
+ false), // whether to update metastore stats only if all stats are available
// Concurrency
@@ -368,7 +372,7 @@
HIVE_ERROR_ON_EMPTY_PARTITION("hive.error.on.empty.partition", false),
- HIVE_INDEX_IGNORE_HDFS_LOC("hive.index.compact.file.ignore.hdfs", false),
+ HIVE_INDEX_IGNORE_HDFS_LOC("hive.index.compact.file.ignore.hdfs", false),
;
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (revision 1067256)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (working copy)
@@ -773,8 +773,13 @@
// Initializing a stats publisher
StatsPublisher statsPublisher = Utilities.getStatsPublisher(jc);
- if (statsPublisher == null || !statsPublisher.connect(hconf)) {
+ if (statsPublisher == null) {
// just return, stats gathering should not block the main query
+ LOG.error("StatsPublishing error: StatsPublisher is not initialized.");
+ return;
+ }
+ if (!statsPublisher.connect(hconf)) {
+ // just return, stats gathering should not block the main query
LOG.error("StatsPublishing error: cannot connect to database");
return;
}
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java (revision 1067256)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java (working copy)
@@ -226,6 +226,11 @@
}
private int aggregateStats() {
+
+ String statsImplementationClass = HiveConf.getVar(conf, HiveConf.ConfVars.HIVESTATSDBCLASS);
+ StatsFactory.setImplementation(statsImplementationClass, conf);
+ StatsAggregator statsAggregator = StatsFactory.getStatsAggregator();
+
try {
// Stats setup:
Warehouse wh = new Warehouse(conf);
@@ -233,19 +238,10 @@
FileStatus[] fileStatus;
// manufacture a StatsAggregator
- StatsAggregator statsAggregator;
- String statsImplementationClass = HiveConf.getVar(conf, HiveConf.ConfVars.HIVESTATSDBCLASS);
- StatsFactory.setImplementation(statsImplementationClass, conf);
- statsAggregator = StatsFactory.getStatsAggregator();
if (!statsAggregator.connect(conf)) {
- // this should not fail the whole job, return 0 so that the job won't fail.
- console.printInfo("[WARNING] Could not update table/partition level stats.",
- "StatsAggregator.connect() failed: stats class = " +
- statsImplementationClass);
- return 0;
+ throw new HiveException("StatsAggregator connect failed " + statsImplementationClass);
}
-
TableStatistics tblStats = new TableStatistics();
//
@@ -287,6 +283,10 @@
String rows = statsAggregator.aggregateStats(work.getAggKey(), StatsSetupConst.ROW_COUNT);
if (rows != null) {
tblStats.setNumRows(Long.parseLong(rows));
+ } else {
+ if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_ATOMIC)) {
+ throw new HiveException("StatsAggregator failed to get numRows.");
+ }
}
} else {
// Partitioned table:
@@ -304,6 +304,10 @@
String rows = statsAggregator.aggregateStats(partitionID, StatsSetupConst.ROW_COUNT);
if (rows != null) {
newPartStats.setNumRows(Long.parseLong(rows));
+ } else {
+ if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_ATOMIC)) {
+ throw new HiveException("StatsAggregator failed to get numRows.");
+ }
}
fileSys = partn.getPartitionPath().getFileSystem(conf);
@@ -358,7 +362,6 @@
}
}
- statsAggregator.closeConnection();
//
// write table stats to metastore
@@ -375,15 +378,16 @@
console.printInfo("Table " + table.getTableName() + " stats: [" + tblStats.toString() + ']');
- return 0;
- }
- catch (Exception e) {
+ } catch (Exception e) {
// return 0 since StatsTask should not fail the whole job
console.printInfo("[Warning] could not update stats.",
"Failed with exception " + e.getMessage() + "\n"
+ StringUtils.stringifyException(e));
- return 0;
+ } finally {
+ statsAggregator.closeConnection();
}
+ // StatsTask always return 0 so that the whole job won't fail
+ return 0;
}
/**
Index: ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java (revision 1067256)
+++ ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java (working copy)
@@ -40,6 +40,7 @@
private Configuration hiveconf;
private final Log LOG = LogFactory.getLog(this.getClass().getName());
private PreparedStatement selStmt, updStmt, insStmt;
+ private int timeout = 30; // default timeout in sec. for JDBC connection and statements
public JDBCStatsPublisher() {
selStmt = updStmt = insStmt = null;
@@ -49,9 +50,10 @@
try {
this.hiveconf = hiveconf;
connectionString = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSDBCONNECTIONSTRING);
+ timeout = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_JDBC_TIMEOUT);
String driver = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSJDBCDRIVER);
Class.forName(driver).newInstance();
- DriverManager.setLoginTimeout(3); // stats should not block
+ DriverManager.setLoginTimeout(timeout); // stats is non-blocking
conn = DriverManager.getConnection(connectionString);
// prepare the SELECT/UPDATE/INSERT statements
@@ -74,9 +76,9 @@
insStmt = conn.prepareStatement(insert);
// make the statements non-blocking
- selStmt.setQueryTimeout(5);
- updStmt.setQueryTimeout(5);
- insStmt.setQueryTimeout(5);
+ selStmt.setQueryTimeout(timeout);
+ updStmt.setQueryTimeout(timeout);
+ insStmt.setQueryTimeout(timeout);
return true;
} catch (Exception e) {
@@ -168,9 +170,11 @@
connectionString = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVESTATSDBCONNECTIONSTRING);
String driver = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVESTATSJDBCDRIVER);
Class.forName(driver).newInstance();
+ DriverManager.setLoginTimeout(timeout);
conn = DriverManager.getConnection(connectionString);
Statement stmt = conn.createStatement();
+ stmt.setQueryTimeout(timeout);
// Check if the table exists
DatabaseMetaData dbm = conn.getMetaData();
Index: ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java (revision 1067256)
+++ ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java (working copy)
@@ -37,14 +37,17 @@
private String connectionString;
private Configuration hiveconf;
private final Log LOG = LogFactory.getLog(this.getClass().getName());
+ private int timeout = 30;
public boolean connect(Configuration hiveconf) {
try {
this.hiveconf = hiveconf;
+ timeout = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_JDBC_TIMEOUT);
connectionString = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSDBCONNECTIONSTRING);
String driver = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSJDBCDRIVER);
Class.forName(driver).newInstance();
- DriverManager.setLoginTimeout(3); // stats should not block
+ // stats is non-blocking -- throw an exception when timeout
+ DriverManager.setLoginTimeout(timeout);
conn = DriverManager.getConnection(connectionString);
return true;
} catch (Exception e) {
@@ -67,6 +70,7 @@
try {
long retval = 0;
Statement stmt = conn.createStatement();
+ stmt.setQueryTimeout(timeout);
String select =
"SELECT SUM" + "(" + JDBCStatsSetupConstants.PART_STAT_ROW_COUNT_COLUMN_NAME + ")" +
" FROM " + JDBCStatsSetupConstants.PART_STAT_TABLE_NAME +