Index: ql/src/test/org/apache/hadoop/hive/ql/exec/TestStatsPublisherEnhanced.java =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/exec/TestStatsPublisherEnhanced.java (revision 1165899) +++ ql/src/test/org/apache/hadoop/hive/ql/exec/TestStatsPublisherEnhanced.java (working copy) @@ -297,7 +297,7 @@ assertEquals("1000", rows0); usize0 = statsAggregator.aggregateStats("file_00000", StatsSetupConst.RAW_DATA_SIZE); - assertEquals("0", usize0); + assertEquals("2000", usize0); assertTrue(statsAggregator.cleanUp("file_0000")); @@ -313,8 +313,96 @@ } } + public void testStatsPublisherManyUpdatesSubsetStatistics() throws Throwable { + try { + System.out + .println("StatsPublisher - (many updates + publishing subset of supported statistics)"); + // instantiate stats publisher + StatsPublisher statsPublisher = Utilities.getStatsPublisher((JobConf) conf); + assertNotNull(statsPublisher); + assertTrue(statsPublisher.init(conf)); + assertTrue(statsPublisher.connect(conf)); + // instantiate stats aggregator + StatsAggregator statsAggregator = StatsFactory.getStatsAggregator(); + assertNotNull(statsAggregator); + assertTrue(statsAggregator.connect(conf)); + + // publish stats + for (int i = 0; i <= 200; i ++) { + fillStatMap((new Integer(i)).toString(), (new Integer(i * 10)).toString()); + assertTrue(statsPublisher.publishStat("file_00000_a", stats)); + fillStatMap((new Integer(i + 100)).toString(), (new Integer((i + 100) * 10)).toString()); + assertTrue(statsPublisher.publishStat("file_00000_b", stats)); + } + + // aggregate existing stats + String rows0 = statsAggregator.aggregateStats("file_00000", StatsSetupConst.ROW_COUNT); + assertEquals("500", rows0); + String usize0 = statsAggregator.aggregateStats("file_00000", + StatsSetupConst.RAW_DATA_SIZE); + assertEquals("5000", usize0); + + assertTrue(statsAggregator.cleanUp("file_0000")); + + // close connections + assertTrue(statsPublisher.closeConnection()); + assertTrue(statsAggregator.closeConnection()); + + System.out + .println("StatsPublisher - (many updates + publishing subset of supported statistics) - OK"); + } catch (Throwable e) { + e.printStackTrace(); + throw e; + } + } + + public void testStatsPublisherManyUpdates() throws Throwable { + try { + System.out + .println("StatsPublisher - (many updates)"); + + // instantiate stats publisher + StatsPublisher statsPublisher = Utilities.getStatsPublisher((JobConf) conf); + assertNotNull(statsPublisher); + assertTrue(statsPublisher.init(conf)); + assertTrue(statsPublisher.connect(conf)); + + // instantiate stats aggregator + StatsAggregator statsAggregator = StatsFactory.getStatsAggregator(); + assertNotNull(statsAggregator); + assertTrue(statsAggregator.connect(conf)); + + // publish stats + for (int i = 0; i < 200; i ++) { + fillStatMap("1", "10"); + assertTrue(statsPublisher.publishStat("file_00000_a" + i, stats)); + fillStatMap("2", "20"); + assertTrue(statsPublisher.publishStat("file_00000_b" + i, stats)); + } + + // aggregate existing stats + String rows0 = statsAggregator.aggregateStats("file_00000", StatsSetupConst.ROW_COUNT); + assertEquals("600", rows0); + String usize0 = statsAggregator.aggregateStats("file_00000", + StatsSetupConst.RAW_DATA_SIZE); + assertEquals("6000", usize0); + + assertTrue(statsAggregator.cleanUp("file_0000")); + + // close connections + assertTrue(statsPublisher.closeConnection()); + assertTrue(statsAggregator.closeConnection()); + + System.out + .println("StatsPublisher - (many updates) - OK"); + } catch (Throwable e) { + e.printStackTrace(); + throw e; + } + } + public void testStatsAggregatorCleanUp() throws Throwable { try { System.out.println("StatsAggregator - clean-up"); Index: ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java (revision 1165899) +++ ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java (working copy) @@ -24,7 +24,6 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; -import java.sql.SQLIntegrityConstraintViolationException; import java.sql.SQLRecoverableException; import java.sql.Statement; import java.util.List; @@ -44,7 +43,7 @@ private String connectionString; private Configuration hiveconf; private final Log LOG = LogFactory.getLog(this.getClass().getName()); - private PreparedStatement updStmt, insStmt; + private PreparedStatement insStmt; private int timeout; // default timeout in sec. for JDBC connection and statements // SQL comment that identifies where the SQL statement comes from private final String comment = "Hive stats publishing: " + this.getClass().getName(); @@ -87,13 +86,10 @@ conn = Utilities.connectWithRetry(connectionString, waitWindow, maxRetries); // prepare statements - updStmt = Utilities.prepareWithRetry(conn, JDBCStatsUtils.getUpdate(comment), waitWindow, - maxRetries); insStmt = Utilities.prepareWithRetry(conn, JDBCStatsUtils.getInsert(comment), waitWindow, maxRetries); // set query timeout - Utilities.executeWithRetry(setQueryTimeout, updStmt, waitWindow, maxRetries); Utilities.executeWithRetry(setQueryTimeout, insStmt, waitWindow, maxRetries); @@ -130,6 +126,13 @@ } LOG.info("Stats publishing for key " + fileID); + Utilities.SQLCommand execQuery = new Utilities.SQLCommand() { + @Override + public ResultSet run(PreparedStatement stmt) throws SQLException { + return stmt.executeQuery(); + } + }; + Utilities.SQLCommand execUpdate = new Utilities.SQLCommand() { @Override public Void run(PreparedStatement stmt) throws SQLException { @@ -148,36 +151,6 @@ } Utilities.executeWithRetry(execUpdate, insStmt, waitWindow, maxRetries); return true; - } catch (SQLIntegrityConstraintViolationException e) { - - // We assume that the table used for partial statistics has a primary key declared on the - // "fileID". The exception will be thrown if two tasks report results for the same fileID. - // In such case, we either update the row, or abandon changes depending on which statistic - // is newer. - - for (int updateFailures = 0;; updateFailures++) { - try { - int i; - for (i = 0; i < JDBCStatsUtils.getSupportedStatistics().size(); i++) { - updStmt.setString(i + 1, stats.get(supportedStatistics.get(i))); - } - updStmt.setString(supportedStatistics.size() + 1, fileID); - updStmt.setString(supportedStatistics.size() + 2, - stats.get(JDBCStatsUtils.getBasicStat())); - updStmt.setString(supportedStatistics.size() + 3, fileID); - Utilities.executeWithRetry(execUpdate, updStmt, waitWindow, maxRetries); - return true; - } catch (SQLRecoverableException ue) { - // need to start from scratch (connection) - if (!handleSQLRecoverableException(ue, updateFailures)) { - return false; - } - } catch (SQLException ue) { - LOG.error("Error during publishing statistics. ", e); - return false; - } - } - } catch (SQLRecoverableException e) { // need to start from scratch (connection) if (!handleSQLRecoverableException(e, failures)) { @@ -216,9 +189,6 @@ return true; } try { - if (updStmt != null) { - updStmt.close(); - } if (insStmt != null) { insStmt.close(); } Index: ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsSetupConstants.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsSetupConstants.java (revision 1165899) +++ ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsSetupConstants.java (working copy) @@ -21,7 +21,7 @@ public static final String PART_STAT_ID_COLUMN_NAME = "ID"; - public static final String PART_STAT_TABLE_NAME = "PARTITION_STATS"; + public static final String PART_STAT_TABLE_NAME = "PARTITION_STATISTICS"; // supported statistics - column names Index: ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsUtils.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsUtils.java (revision 1165899) +++ ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsUtils.java (working copy) @@ -104,7 +104,7 @@ */ public static String getCreate(String comment) { String create = "CREATE TABLE /* " + comment + " */ " + JDBCStatsUtils.getStatTableName() + - " (" + JDBCStatsUtils.getIdColumnName() + " VARCHAR(255) PRIMARY KEY "; + " (" + JDBCStatsUtils.getIdColumnName() + " VARCHAR(255) "; for (int i = 0; i < supportedStats.size(); i++) { create += ", " + getStatColumnName(supportedStats.get(i)) + " BIGINT "; } @@ -113,22 +113,6 @@ } /** - * Prepares UPDATE statement issued when updating existing statistics - */ - public static String getUpdate(String comment) { - String update = "UPDATE /* " + comment + " */ " + getStatTableName() + " SET "; - for (int i = 0; i < supportedStats.size(); i++) { - update += columnNameMapping.get(supportedStats.get(i)) + " = ? , "; - } - update = update.substring(0, update.length() - 2); - update += " WHERE " + JDBCStatsUtils.getIdColumnName() + " = ? AND ? > ( SELECT TEMP." - + getStatColumnName(getBasicStat()) + " FROM ( " + - " SELECT " + getStatColumnName(getBasicStat()) + " FROM " + getStatTableName() + " WHERE " - + getIdColumnName() + " = ? ) TEMP )"; - return update; - } - - /** * Prepares INSERT statement for statistic publishing. */ public static String getInsert(String comment) { @@ -151,8 +135,10 @@ */ public static String getSelectAggr(String statType, String comment) { String select = "SELECT /* " + comment + " */ " + "SUM( " - + getStatColumnName(statType) + " ) " + " FROM " - + getStatTableName() + " WHERE " + JDBCStatsUtils.getIdColumnName() + " LIKE ? ESCAPE ?"; + + getStatColumnName(statType) + " ) FROM ( SELECT MAX( " + getStatColumnName(statType) + + " ) AS " + getStatColumnName(statType) + " FROM " + getStatTableName() + " WHERE " + + JDBCStatsUtils.getIdColumnName() + " LIKE ? ESCAPE ? GROUP BY " + + JDBCStatsUtils.getIdColumnName() + " ) AS t1"; return select; }