Index: conf/hive-default.xml =================================================================== --- conf/hive-default.xml (revision 1067256) +++ conf/hive-default.xml (working copy) @@ -734,6 +734,30 @@ + hive.stats.default.publisher + + The Java class (implementing the StatsPublisher interface) that is used by default if hive.stats.dbclass is not JDBC or HBase. + + + + hive.stats.default.aggregator + + The Java class (implementing the StatsAggregator interface) that is used by default if hive.stats.dbclass is not JDBC or HBase. + + + + hive.stats.jdbc.timeout + 30 + Timeout value (number of seconds) used by JDBC connection and statements. + + + + hive.stats.jdbc.atomic + false + If this is set to true then the metastore stats will be updated only if all types of stats (# of rows, # of files, # of bytes etc.) are available. Otherwise metastore stats are updated in a best effort fashion with whatever are available. + + + hive.support.concurrency false Whether hive supports concurrency or not. A zookeeper instance must be up and running for the default hive lock manager to support read-write locks. Index: build-common.xml =================================================================== --- build-common.xml (revision 1067256) +++ build-common.xml (working copy) @@ -54,7 +54,7 @@ - + Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1067256) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -325,6 +325,10 @@ ""), // default stats publisher if none of JDBC/HBase is specified HIVE_STATS_DEFAULT_AGGREGATOR("hive.stats.default.aggregator", ""), // default stats aggregator if none of JDBC/HBase is specified + HIVE_STATS_JDBC_TIMEOUT("hive.stats.jdbc.timeout", + 30), // default timeout in sec for JDBC connection & SQL statements + HIVE_STATS_ATOMIC("hive.stats.atomic", + false), // whether to update metastore stats only if all stats are available // Concurrency @@ -368,7 +372,7 @@ HIVE_ERROR_ON_EMPTY_PARTITION("hive.error.on.empty.partition", false), - HIVE_INDEX_IGNORE_HDFS_LOC("hive.index.compact.file.ignore.hdfs", false), + HIVE_INDEX_IGNORE_HDFS_LOC("hive.index.compact.file.ignore.hdfs", false), ; Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (revision 1067256) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (working copy) @@ -773,8 +773,13 @@ // Initializing a stats publisher StatsPublisher statsPublisher = Utilities.getStatsPublisher(jc); - if (statsPublisher == null || !statsPublisher.connect(hconf)) { + if (statsPublisher == null) { // just return, stats gathering should not block the main query + LOG.error("StatsPublishing error: StatsPublisher is not initialized."); + return; + } + if (!statsPublisher.connect(hconf)) { + // just return, stats gathering should not block the main query LOG.error("StatsPublishing error: cannot connect to database"); return; } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java (revision 1067256) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java (working copy) @@ -226,6 +226,11 @@ } private int aggregateStats() { + + String statsImplementationClass = HiveConf.getVar(conf, HiveConf.ConfVars.HIVESTATSDBCLASS); + StatsFactory.setImplementation(statsImplementationClass, conf); + StatsAggregator statsAggregator = StatsFactory.getStatsAggregator(); + try { // Stats setup: Warehouse wh = new Warehouse(conf); @@ -233,19 +238,10 @@ FileStatus[] fileStatus; // manufacture a StatsAggregator - StatsAggregator statsAggregator; - String statsImplementationClass = HiveConf.getVar(conf, HiveConf.ConfVars.HIVESTATSDBCLASS); - StatsFactory.setImplementation(statsImplementationClass, conf); - statsAggregator = StatsFactory.getStatsAggregator(); if (!statsAggregator.connect(conf)) { - // this should not fail the whole job, return 0 so that the job won't fail. - console.printInfo("[WARNING] Could not update table/partition level stats.", - "StatsAggregator.connect() failed: stats class = " + - statsImplementationClass); - return 0; + throw new HiveException("StatsAggregator connect failed " + statsImplementationClass); } - TableStatistics tblStats = new TableStatistics(); // @@ -287,6 +283,10 @@ String rows = statsAggregator.aggregateStats(work.getAggKey(), StatsSetupConst.ROW_COUNT); if (rows != null) { tblStats.setNumRows(Long.parseLong(rows)); + } else { + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_ATOMIC)) { + throw new HiveException("StatsAggregator failed to get numRows."); + } } } else { // Partitioned table: @@ -304,6 +304,10 @@ String rows = statsAggregator.aggregateStats(partitionID, StatsSetupConst.ROW_COUNT); if (rows != null) { newPartStats.setNumRows(Long.parseLong(rows)); + } else { + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_ATOMIC)) { + throw new HiveException("StatsAggregator failed to get numRows."); + } } fileSys = partn.getPartitionPath().getFileSystem(conf); @@ -358,7 +362,6 @@ } } - statsAggregator.closeConnection(); // // write table stats to metastore @@ -375,15 +378,16 @@ console.printInfo("Table " + table.getTableName() + " stats: [" + tblStats.toString() + ']'); - return 0; - } - catch (Exception e) { + } catch (Exception e) { // return 0 since StatsTask should not fail the whole job console.printInfo("[Warning] could not update stats.", "Failed with exception " + e.getMessage() + "\n" + StringUtils.stringifyException(e)); - return 0; + } finally { + statsAggregator.closeConnection(); } + // StatsTask always return 0 so that the whole job won't fail + return 0; } /** Index: ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java (revision 1067256) +++ ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java (working copy) @@ -40,6 +40,7 @@ private Configuration hiveconf; private final Log LOG = LogFactory.getLog(this.getClass().getName()); private PreparedStatement selStmt, updStmt, insStmt; + private int timeout = 30; // default timeout in sec. for JDBC connection and statements public JDBCStatsPublisher() { selStmt = updStmt = insStmt = null; @@ -49,9 +50,10 @@ try { this.hiveconf = hiveconf; connectionString = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSDBCONNECTIONSTRING); + timeout = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_JDBC_TIMEOUT); String driver = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSJDBCDRIVER); Class.forName(driver).newInstance(); - DriverManager.setLoginTimeout(3); // stats should not block + DriverManager.setLoginTimeout(timeout); // stats is non-blocking conn = DriverManager.getConnection(connectionString); // prepare the SELECT/UPDATE/INSERT statements @@ -74,9 +76,9 @@ insStmt = conn.prepareStatement(insert); // make the statements non-blocking - selStmt.setQueryTimeout(5); - updStmt.setQueryTimeout(5); - insStmt.setQueryTimeout(5); + selStmt.setQueryTimeout(timeout); + updStmt.setQueryTimeout(timeout); + insStmt.setQueryTimeout(timeout); return true; } catch (Exception e) { @@ -168,9 +170,11 @@ connectionString = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVESTATSDBCONNECTIONSTRING); String driver = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVESTATSJDBCDRIVER); Class.forName(driver).newInstance(); + DriverManager.setLoginTimeout(timeout); conn = DriverManager.getConnection(connectionString); Statement stmt = conn.createStatement(); + stmt.setQueryTimeout(timeout); // Check if the table exists DatabaseMetaData dbm = conn.getMetaData(); Index: ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java (revision 1067256) +++ ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java (working copy) @@ -37,14 +37,17 @@ private String connectionString; private Configuration hiveconf; private final Log LOG = LogFactory.getLog(this.getClass().getName()); + private int timeout = 30; public boolean connect(Configuration hiveconf) { try { this.hiveconf = hiveconf; + timeout = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_JDBC_TIMEOUT); connectionString = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSDBCONNECTIONSTRING); String driver = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSJDBCDRIVER); Class.forName(driver).newInstance(); - DriverManager.setLoginTimeout(3); // stats should not block + // stats is non-blocking -- throw an exception when timeout + DriverManager.setLoginTimeout(timeout); conn = DriverManager.getConnection(connectionString); return true; } catch (Exception e) { @@ -67,6 +70,7 @@ try { long retval = 0; Statement stmt = conn.createStatement(); + stmt.setQueryTimeout(timeout); String select = "SELECT SUM" + "(" + JDBCStatsSetupConstants.PART_STAT_ROW_COUNT_COLUMN_NAME + ")" + " FROM " + JDBCStatsSetupConstants.PART_STAT_TABLE_NAME +