diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java index 5ab3ebb..903883d 100644 --- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java +++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java @@ -343,7 +343,10 @@ private static void createPartitionIfNotExists(HiveEndPoint ep, if (ep.partitionVals.isEmpty()) { return; } - SessionState state = SessionState.start(new CliSessionState(conf)); + SessionState localSession = null; + if(SessionState.get() == null) { + localSession = SessionState.start(new CliSessionState(conf)); + } Driver driver = new Driver(conf); try { @@ -372,7 +375,9 @@ private static void createPartitionIfNotExists(HiveEndPoint ep, } finally { driver.close(); try { - state.close(); + if(localSession != null) { + localSession.close(); + } } catch (IOException e) { LOG.warn("Error closing SessionState used to run Hive DDL."); } @@ -563,11 +568,14 @@ private void beginNextTransactionImpl() throws TransactionError { /** * Get Id of currently open transaction - * @return + * @return -1 if there is no open TX */ @Override public Long getCurrentTxnId() { - return txnIds.get(currentTxnIndex); + if(currentTxnIndex >= 0) { + return txnIds.get(currentTxnIndex); + } + return -1L; } /** diff --git hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java index e02caec..ec99728 100644 --- hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java +++ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java @@ -54,6 +54,8 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.io.FileNotFoundException; @@ -67,6 +69,7 @@ public class TestStreaming { + private static final Logger LOG = LoggerFactory.getLogger(TestStreaming.class); public static class RawFileSystem extends RawLocalFileSystem { private static final URI NAME; @@ -636,18 +639,25 @@ public void testInterleavedTransactionBatchCommits() throws Exception { connection.close(); } - class WriterThd extends Thread { + private static class WriterThd extends Thread { - private StreamingConnection conn; - private HiveEndPoint ep; - private DelimitedInputWriter writer; - private String data; + private final StreamingConnection conn; + private final DelimitedInputWriter writer; + private final String data; + private Throwable error; WriterThd(HiveEndPoint ep, String data) throws Exception { - this.ep = ep; + super("Writer_" + data); writer = new DelimitedInputWriter(fieldNames, ",", ep); conn = ep.newConnection(false); this.data = data; + setUncaughtExceptionHandler(new UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread thread, Throwable throwable) { + error = throwable; + LOG.error("Thread " + thread.getName() + " died: " + throwable.getMessage(), throwable); + } + }); } @Override @@ -668,14 +678,14 @@ public void run() { try { txnBatch.close(); } catch (Exception e) { + LOG.error("txnBatch.close() failed: " + e.getMessage(), e); conn.close(); - throw new RuntimeException(e); } } try { conn.close(); } catch (Exception e) { - throw new RuntimeException(e); + LOG.error("conn.close() failed: " + e.getMessage(), e); } } @@ -685,18 +695,23 @@ public void run() { @Test public void testConcurrentTransactionBatchCommits() throws Exception { final HiveEndPoint ep = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals); - WriterThd t1 = new WriterThd(ep, "1,Matrix"); - WriterThd t2 = new WriterThd(ep, "2,Gandhi"); - WriterThd t3 = new WriterThd(ep, "3,Silence"); - - t1.start(); - t2.start(); - t3.start(); - - t1.join(); - t2.join(); - t3.join(); + List writers = new ArrayList(3); + writers.add(new WriterThd(ep, "1,Matrix")); + writers.add(new WriterThd(ep, "2,Gandhi")); + writers.add(new WriterThd(ep, "3,Silence")); + for(WriterThd w : writers) { + w.start(); + } + for(WriterThd w : writers) { + w.join(); + } + for(WriterThd w : writers) { + if(w.error != null) { + Assert.assertFalse("Writer thread" + w.getName() + " died: " + w.error.getMessage() + + " See log file for stack trace", true); + } + } } // delete db and all tables in it diff --git itests/hive-unit/pom.xml itests/hive-unit/pom.xml index be8d11f..077631a 100644 --- itests/hive-unit/pom.xml +++ itests/hive-unit/pom.xml @@ -53,6 +53,16 @@ hive-exec ${project.version} + + org.apache.hive.hcatalog + hive-hcatalog-core + ${project.version} + + + org.apache.hive.hcatalog + hive-hcatalog-streaming + ${project.version} + diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java new file mode 100644 index 0000000..9807497 --- /dev/null +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java @@ -0,0 +1,310 @@ +package org.apache.hadoop.hive.ql.txn.compactor; + +import junit.framework.Assert; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hive.cli.CliSessionState; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.MetaStoreThread; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; +import org.apache.hadoop.hive.metastore.api.CompactionRequest; +import org.apache.hadoop.hive.metastore.api.CompactionType; +import org.apache.hadoop.hive.metastore.api.LongColumnStatsData; +import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; +import org.apache.hadoop.hive.metastore.api.StringColumnStatsData; +import org.apache.hadoop.hive.metastore.txn.CompactionInfo; +import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler; +import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.apache.hadoop.hive.ql.CommandNeedRetryException; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.io.HiveInputFormat; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hive.hcatalog.common.HCatUtil; +import org.apache.hive.hcatalog.streaming.DelimitedInputWriter; +import org.apache.hive.hcatalog.streaming.HiveEndPoint; +import org.apache.hive.hcatalog.streaming.StreamingConnection; +import org.apache.hive.hcatalog.streaming.TransactionBatch; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +/** + */ +public class TestCompactor { + private static final Logger LOG = LoggerFactory.getLogger(TestCompactor.class); + private static final String TEST_DATA_DIR = HCatUtil.makePathASafeFileName(System.getProperty("java.io.tmpdir") + + File.separator + TestCompactor.class.getCanonicalName() + "-" + System.currentTimeMillis()); + private static final String BASIC_FILE_NAME = TEST_DATA_DIR + "/basic.input.data"; + private static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse"; + + @Rule + public TemporaryFolder stagingFolder = new TemporaryFolder(); + private HiveConf conf; + IMetaStoreClient msClient; + private Driver driver; + + @Before + public void setup() throws Exception { + + File f = new File(TEST_WAREHOUSE_DIR); + if (f.exists()) { + FileUtil.fullyDelete(f); + } + if(!(new File(TEST_WAREHOUSE_DIR).mkdirs())) { + throw new RuntimeException("Could not create " + TEST_WAREHOUSE_DIR); + } + + HiveConf hiveConf = new HiveConf(this.getClass()); + hiveConf.setVar(HiveConf.ConfVars.PREEXECHOOKS, ""); + hiveConf.setVar(HiveConf.ConfVars.POSTEXECHOOKS, ""); + hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, TEST_WAREHOUSE_DIR); + hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName()); + //"org.apache.hadoop.hive.ql.io.HiveInputFormat" + + TxnDbUtil.setConfValues(hiveConf); + TxnDbUtil.cleanDb(); + TxnDbUtil.prepDb(); + + conf = hiveConf; + msClient = new HiveMetaStoreClient(conf); + driver = new Driver(hiveConf); + SessionState.start(new CliSessionState(hiveConf)); + + + int LOOP_SIZE = 3; + String[] input = new String[LOOP_SIZE * LOOP_SIZE]; + int k = 0; + for (int i = 1; i <= LOOP_SIZE; i++) { + String si = i + ""; + for (int j = 1; j <= LOOP_SIZE; j++) { + String sj = "S" + j + "S"; + input[k] = si + "\t" + sj; + k++; + } + } + createTestDataFile(BASIC_FILE_NAME, input); + } + @After + public void tearDown() { + conf = null; + if(msClient != null) { + msClient.close(); + } + if(driver != null) { + driver.close(); + } + } + + /** + * After each major compaction, stats need to be updated on each column of the + * table/partition which previously had stats. + * 1. create a bucketed ORC backed table (Orc is currently required by ACID) + * 2. populate 2 partitions with data + * 3. compute stats + * 4. insert some data into the table using StreamingAPI + * 5. Trigger major compaction (which should update stats) + * 6. check that stats have been updated + * @throws Exception + * todo: + * 2. add non-partitioned test + * 4. add a test with sorted table? + */ + @Test + public void testStatsAfterCompactionPartTbl() throws Exception { + //as of (8/27/2014) Hive 0.14, ACID/Orc requires HiveInputFormat + String tblName = "compaction_test"; + String tblNameStg = tblName + "_stg"; + List colNames = Arrays.asList("a", "b"); + executeStatementOnDriver("drop table if exists " + tblName, driver); + executeStatementOnDriver("drop table if exists " + tblNameStg, driver); + executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " + + " PARTITIONED BY(bkt INT)" + + " CLUSTERED BY(a) INTO 4 BUCKETS" + //currently ACID requires table to be bucketed + " STORED AS ORC", driver); + executeStatementOnDriver("CREATE EXTERNAL TABLE " + tblNameStg + "(a INT, b STRING)" + + " ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t' LINES TERMINATED BY '\\n'" + + " STORED AS TEXTFILE" + + " LOCATION '" + stagingFolder.newFolder() + "'", driver); + + executeStatementOnDriver("load data local inpath '" + BASIC_FILE_NAME + + "' overwrite into table " + tblNameStg, driver); + execSelectAndDumpData("select * from " + tblNameStg, driver, "Dumping data for " + + tblNameStg + " after load:"); + executeStatementOnDriver("FROM " + tblNameStg + + " INSERT OVERWRITE TABLE " + tblName + " PARTITION(bkt=0) " + + "SELECT a, b where a < 2", driver); + executeStatementOnDriver("FROM " + tblNameStg + + " INSERT OVERWRITE TABLE " + tblName + " PARTITION(bkt=1) " + + "SELECT a, b where a >= 2", driver); + execSelectAndDumpData("select * from " + tblName, driver, "Dumping data for " + + tblName + " after load:"); + + CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf); + CompactionInfo ci = new CompactionInfo("default", tblName, "bkt=0", CompactionType.MAJOR); + LOG.debug("List of stats columns before analyze Part1: " + txnHandler.findColumnsWithStats(ci)); + Worker.StatsUpdater su = Worker.StatsUpdater.init(ci, colNames, conf, + System.getProperty("user.name")); + su.gatherStats();//compute stats before compaction + LOG.debug("List of stats columns after analyze Part1: " + txnHandler.findColumnsWithStats(ci)); + + CompactionInfo ciPart2 = new CompactionInfo("default", tblName, "bkt=1", CompactionType.MAJOR); + LOG.debug("List of stats columns before analyze Part2: " + txnHandler.findColumnsWithStats(ci)); + su = Worker.StatsUpdater.init(ciPart2, colNames, conf, System.getProperty("user.name")); + su.gatherStats();//compute stats before compaction + LOG.debug("List of stats columns after analyze Part2: " + txnHandler.findColumnsWithStats(ci)); + + //now make sure we get the stats we expect for partition we are going to add data to later + Map> stats = msClient.getPartitionColumnStatistics(ci.dbname, + ci.tableName, Arrays.asList(ci.partName), colNames); + List colStats = stats.get(ci.partName); + Assert.assertNotNull("No stats found for partition " + ci.partName, colStats); + Assert.assertEquals("Expected column 'a' at index 0", "a", colStats.get(0).getColName()); + Assert.assertEquals("Expected column 'b' at index 1", "b", colStats.get(1).getColName()); + LongColumnStatsData colAStats = colStats.get(0).getStatsData().getLongStats(); + Assert.assertEquals("lowValue a", 1, colAStats.getLowValue()); + Assert.assertEquals("highValue a", 1, colAStats.getHighValue()); + Assert.assertEquals("numNulls a", 0, colAStats.getNumNulls()); + Assert.assertEquals("numNdv a", 1, colAStats.getNumDVs()); + StringColumnStatsData colBStats = colStats.get(1).getStatsData().getStringStats(); + Assert.assertEquals("maxColLen b", 3, colBStats.getMaxColLen()); + Assert.assertEquals("avgColLen b", 3.0, colBStats.getAvgColLen()); + Assert.assertEquals("numNulls b", 0, colBStats.getNumNulls()); + Assert.assertEquals("nunDVs", 2, colBStats.getNumDVs()); + + //now save stats for partition we won't modify + stats = msClient.getPartitionColumnStatistics(ciPart2.dbname, + ciPart2.tableName, Arrays.asList(ciPart2.partName), colNames); + colStats = stats.get(ciPart2.partName); + LongColumnStatsData colAStatsPart2 = colStats.get(0).getStatsData().getLongStats(); + StringColumnStatsData colBStatsPart2 = colStats.get(1).getStatsData().getStringStats(); + + + HiveEndPoint endPt = new HiveEndPoint(null, ci.dbname, ci.tableName, Arrays.asList("0")); + DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"a","b"},",", endPt); + /*next call will eventually end up in HiveEndPoint.createPartitionIfNotExists() which + makes an operation on Driver + * and starts it's own CliSessionState and then closes it, which removes it from ThreadLoacal; + * thus the session + * created in this class is gone after this; I fixed it in HiveEndPoint*/ + StreamingConnection connection = endPt.newConnection(true); + + TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer); + txnBatch.beginNextTransaction(); + Assert.assertEquals(TransactionBatch.TxnState.OPEN, txnBatch.getCurrentTransactionState()); + txnBatch.write("50,Kiev".getBytes()); + txnBatch.write("51,St. Petersburg".getBytes()); + txnBatch.write("44,Boston".getBytes()); + txnBatch.commit(); + + txnBatch.beginNextTransaction(); + txnBatch.write("52,Tel Aviv".getBytes()); + txnBatch.write("53,Atlantis".getBytes()); + txnBatch.write("53,Boston".getBytes()); + txnBatch.commit(); + + txnBatch.close(); + connection.close(); + execSelectAndDumpData("select * from " + ci.getFullTableName(), driver, ci.getFullTableName()); + + //so now we have written some new data to bkt=0 and it shows up + CompactionRequest rqst = new CompactionRequest(ci.dbname, ci.tableName, CompactionType.MAJOR); + rqst.setPartitionname(ci.partName); + txnHandler.compact(rqst); + Worker t = new Worker(); + t.setThreadId((int) t.getId()); + t.setHiveConf(conf); + MetaStoreThread.BooleanPointer stop = new MetaStoreThread.BooleanPointer(); + stop.boolVal = true; + t.init(stop); + t.run(); + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List compacts = rsp.getCompacts(); + Assert.assertEquals(1, compacts.size()); + Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); + + stats = msClient.getPartitionColumnStatistics(ci.dbname, ci.tableName, + Arrays.asList(ci.partName), colNames); + colStats = stats.get(ci.partName); + Assert.assertNotNull("No stats found for partition " + ci.partName, colStats); + Assert.assertEquals("Expected column 'a' at index 0", "a", colStats.get(0).getColName()); + Assert.assertEquals("Expected column 'b' at index 1", "b", colStats.get(1).getColName()); + colAStats = colStats.get(0).getStatsData().getLongStats(); + Assert.assertEquals("lowValue a", 1, colAStats.getLowValue()); + Assert.assertEquals("highValue a", 53, colAStats.getHighValue()); + Assert.assertEquals("numNulls a", 0, colAStats.getNumNulls()); + Assert.assertEquals("numNdv a", 6, colAStats.getNumDVs()); + colBStats = colStats.get(1).getStatsData().getStringStats(); + Assert.assertEquals("maxColLen b", 14, colBStats.getMaxColLen()); + //cast it to long to get rid of periodic decimal + Assert.assertEquals("avgColLen b", (long)6.1111111111, (long)colBStats.getAvgColLen()); + Assert.assertEquals("numNulls b", 0, colBStats.getNumNulls()); + Assert.assertEquals("nunDVs", 10, colBStats.getNumDVs()); + + //now check that stats for partition we didn't modify did not change + stats = msClient.getPartitionColumnStatistics(ciPart2.dbname, ciPart2.tableName, + Arrays.asList(ciPart2.partName), colNames); + colStats = stats.get(ciPart2.partName); + Assert.assertEquals("Expected stats for " + ciPart2.partName + " to stay the same", + colAStatsPart2, colStats.get(0).getStatsData().getLongStats()); + Assert.assertEquals("Expected stats for " + ciPart2.partName + " to stay the same", + colBStatsPart2, colStats.get(1).getStatsData().getStringStats()); + } + + /** + * convenience method to execute a select stmt and dump results to log file + */ + private static void execSelectAndDumpData(String selectStmt, Driver driver, String msg) + throws Exception { + executeStatementOnDriver(selectStmt, driver); + ArrayList valuesReadFromHiveDriver = new ArrayList(); + driver.getResults(valuesReadFromHiveDriver); + int rowIdx = 0; + LOG.debug(msg); + for(String row : valuesReadFromHiveDriver) { + LOG.debug(" rowIdx=" + rowIdx++ + ":" + row); + } + } + /** + * Execute Hive CLI statement + * @param cmd arbitrary statement to execute + */ + static void executeStatementOnDriver(String cmd, Driver driver) throws IOException, CommandNeedRetryException { + LOG.debug("Executing: " + cmd); + CommandProcessorResponse cpr = driver.run(cmd); + if(cpr.getResponseCode() != 0) { + throw new IOException("Failed to execute \"" + cmd + "\". Driver returned: " + cpr); + } + } + static void createTestDataFile(String filename, String[] lines) throws IOException { + FileWriter writer = null; + try { + File file = new File(filename); + file.deleteOnExit(); + writer = new FileWriter(file); + for (String line : lines) { + writer.write(line + "\n"); + } + } finally { + if (writer != null) { + writer.close(); + } + } + + } +} diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java index a818300..d249db0 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java @@ -34,9 +34,17 @@ private String fullPartitionName = null; private String fullTableName = null; + public CompactionInfo(String dbname, String tableName, String partName, CompactionType type) { + this.dbname = dbname; + this.tableName = tableName; + this.partName = partName; + this.type = type; + } + CompactionInfo() {} + public String getFullPartitionName() { if (fullPartitionName == null) { - StringBuffer buf = new StringBuffer(dbname); + StringBuilder buf = new StringBuilder(dbname); buf.append('.'); buf.append(tableName); if (partName != null) { @@ -50,11 +58,14 @@ public String getFullPartitionName() { public String getFullTableName() { if (fullTableName == null) { - StringBuffer buf = new StringBuffer(dbname); + StringBuilder buf = new StringBuilder(dbname); buf.append('.'); buf.append(tableName); fullTableName = buf.toString(); } return fullTableName; } + public boolean isMajorCompaction() { + return CompactionType.MAJOR == type; + } } diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index 524a7a4..d3aa66f 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -535,6 +535,46 @@ public void revokeTimedoutWorkers(long timeout) throws MetaException { deadlockCnt = 0; } } + + /** + * Queries metastore DB directly to find columns in the table which have statistics information. + * If {@code ci} includes partition info then per partition stats info is examined, otherwise + * table level stats are examined. + * @throws MetaException + */ + public List findColumnsWithStats(CompactionInfo ci) throws MetaException { + Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + Statement stmt = null; + ResultSet rs = null; + try { + stmt = dbConn.createStatement(); + String s = "SELECT COLUMN_NAME FROM " + (ci.partName == null ? "TAB_COL_STATS" : "PART_COL_STATS") + + " WHERE DB_NAME='" + ci.dbname + "' AND TABLE_NAME='" + ci.tableName + "'" + + (ci.partName == null ? "" : " AND PARTITION_NAME='" + ci.partName + "'"); + LOG.debug("Going to execute <" + s + ">"); + rs = stmt.executeQuery(s); + List columns = new ArrayList(); + while(rs.next()) { + columns.add(rs.getString(1)); + } + LOG.debug("Found columns to update stats: " + columns + " on " + ci.tableName + + (ci.partName == null ? "" : "/" + ci.partName)); + dbConn.commit(); + return columns; + } catch (SQLException e) { + try { + LOG.error("Failed to find columns to analyze stats on for " + ci.tableName + + (ci.partName == null ? "" : "/" + ci.partName), e); + dbConn.rollback(); + } catch (SQLException e1) { + //nothing we can do here + } + throw new MetaException("Unable to connect to transaction database " + + StringUtils.stringifyException(e)); + } finally { + close(rs, stmt, dbConn); + } + } } diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 063dee6..3c6cddd 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -859,6 +859,29 @@ protected void closeStmt(Statement stmt) { } /** + * Close the ResultSet. + * @param rs may be {@code null} + */ + void close(ResultSet rs) { + try { + if (rs != null && !rs.isClosed()) { + rs.close(); + } + } + catch(SQLException ex) { + LOG.warn("Failed to close statement " + ex.getMessage()); + } + } + + /** + * Close all 3 JDBC artifacts in order: {@code rs stmt dbConn} + */ + void close(ResultSet rs, Statement stmt, Connection dbConn) { + close(rs); + closeStmt(stmt); + closeDbConn(dbConn); + } + /** * Determine if an exception was a deadlock. Unfortunately there is no standard way to do * this, so we have to inspect the error messages and catch the telltale signs for each * different database. diff --git ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorResponse.java ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorResponse.java index 7c173d3..4584517 100644 --- ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorResponse.java +++ ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorResponse.java @@ -76,4 +76,9 @@ public CommandProcessorResponse(int responseCode, String errorMessage, String SQ public String getSQLState() { return SQLState; } public Schema getSchema() { return resSchema; } public Throwable getException() { return exception; } + public String toString() { + return "(" + responseCode + "," + errorMessage + "," + SQLState + + (resSchema == null ? "" : ",") + + (exception == null ? "" : exception.getMessage()) + ")"; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 7780679..93d1bc0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -100,7 +100,7 @@ public CompactorMR() { * @throws java.io.IOException if the job fails */ void run(HiveConf conf, String jobName, Table t, StorageDescriptor sd, - ValidTxnList txns, boolean isMajor) throws IOException { + ValidTxnList txns, boolean isMajor, Worker.StatsUpdater su) throws IOException { JobConf job = new JobConf(conf); job.setJobName(jobName); job.setOutputKeyClass(NullWritable.class); @@ -182,6 +182,7 @@ void run(HiveConf conf, String jobName, Table t, StorageDescriptor sd, LOG.debug("Setting maximume transaction to " + maxTxn); JobClient.runJob(job).waitForCompletion(); + su.gatherStats(); } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java index f464df8..347bf65 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java @@ -20,20 +20,28 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.common.ValidTxnList; -import org.apache.hadoop.hive.metastore.api.CompactionType; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; import org.apache.hadoop.hive.metastore.txn.TxnHandler; -import org.apache.hadoop.io.Text; +import org.apache.hadoop.hive.ql.CommandNeedRetryException; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; +import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; import java.security.PrivilegedExceptionAction; +import java.util.Collections; +import java.util.List; +import java.util.Map; /** * A class to do compactions. This will run in a separate thread. It will spin on the @@ -110,7 +118,7 @@ public void run() { continue; } - final boolean isMajor = (ci.type == CompactionType.MAJOR); + final boolean isMajor = ci.isMajorCompaction(); final ValidTxnList txns = TxnHandler.createValidTxnList(txnHandler.getOpenTxns()); final StringBuffer jobName = new StringBuffer(name); @@ -129,17 +137,19 @@ public void run() { LOG.info("Starting " + ci.type.toString() + " compaction for " + ci.getFullPartitionName()); + final StatsUpdater su = StatsUpdater.init(ci, txnHandler.findColumnsWithStats(ci), conf, + runJobAsSelf(runAs) ? runAs : t.getOwner()); final CompactorMR mr = new CompactorMR(); try { if (runJobAsSelf(runAs)) { - mr.run(conf, jobName.toString(), t, sd, txns, isMajor); + mr.run(conf, jobName.toString(), t, sd, txns, isMajor, su); } else { UserGroupInformation ugi = UserGroupInformation.createProxyUser(t.getOwner(), UserGroupInformation.getLoginUser()); ugi.doAs(new PrivilegedExceptionAction() { @Override public Object run() throws Exception { - mr.run(conf, jobName.toString(), t, sd, txns, isMajor); + mr.run(conf, jobName.toString(), t, sd, txns, isMajor, su); return null; } }); @@ -161,11 +171,95 @@ public Object run() throws Exception { public void init(BooleanPointer stop) throws MetaException { super.init(stop); - StringBuffer name = new StringBuffer(hostname()); + StringBuilder name = new StringBuilder(hostname()); name.append("-"); name.append(getId()); this.name = name.toString(); setName(name.toString()); } + static final class StatsUpdater { + static final private Log LOG = LogFactory.getLog(StatsUpdater.class); + + public static StatsUpdater init(CompactionInfo ci, List columnListForStats, + HiveConf conf, String userName) { + return new StatsUpdater(ci, columnListForStats, conf, userName); + } + /** + * list columns for which to compute stats. This maybe empty which means no stats gathering + * is needed. + */ + private final List columnList; + private final HiveConf conf; + private final String userName; + private final CompactionInfo ci; + + private StatsUpdater(CompactionInfo ci, List columnListForStats, + HiveConf conf, String userName) { + this.conf = conf; + this.userName = userName; + this.ci = ci; + if(!ci.isMajorCompaction() || columnListForStats == null || columnListForStats.isEmpty()) { + columnList = Collections.emptyList(); + return; + } + columnList = columnListForStats; + } + + /** + * todo: what should this do on failure? Should it rethrow? Invalidate stats? + */ + void gatherStats() throws IOException { + if(!ci.isMajorCompaction()) { + return; + } + if(columnList.isEmpty()) { + LOG.debug("No existing stats for " + ci.dbname + "." + ci.tableName + " found. Will not run analyze."); + return;//nothing to do + } + //e.g. analyze table page_view partition(dt='10/15/2014',country=’US’) + // compute statistics for columns viewtime + StringBuilder sb = new StringBuilder("analyze table ").append(ci.dbname).append(".").append(ci.tableName); + if(ci.partName != null) { + try { + sb.append(" partition("); + Map partitionColumnValues = Warehouse.makeEscSpecFromName(ci.partName); + for(Map.Entry ent : partitionColumnValues.entrySet()) { + sb.append(ent.getKey()).append("='").append(ent.getValue()).append("'"); + } + sb.append(")"); + } + catch(MetaException ex) { + throw new IOException(ex); + } + } + sb.append(" compute statistics for columns "); + for(String colName : columnList) { + sb.append(colName).append(","); + } + sb.setLength(sb.length() - 1);//remove trailing , + LOG.debug("running '" + sb.toString() + "'"); + Driver d = new Driver(conf, userName); + SessionState localSession = null; + if(SessionState.get() == null) { + localSession = SessionState.start(new SessionState(conf)); + } + try { + CommandProcessorResponse cpr = d.run(sb.toString()); + if (cpr.getResponseCode() != 0) { + throw new IOException("Could not update stats for table " + ci.getFullTableName() + + (ci.partName == null ? "" : "/" + ci.partName) + " due to: " + cpr); + } + } + catch(CommandNeedRetryException cnre) { + throw new IOException("Could not update stats for table " + ci.getFullTableName() + + (ci.partName == null ? "" : "/" + ci.partName) + " due to: " + cnre.getMessage()); + } + finally { + if(localSession != null) { + localSession.close(); + } + } + } + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java index 1b9469d..90a722b 100644 --- ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java +++ ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java @@ -24,7 +24,6 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.*; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; -import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -280,7 +279,7 @@ public void minorTableWithBase() throws Exception { // There should still now be 5 directories in the location FileSystem fs = FileSystem.get(conf); FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation())); -for (int i = 0; i < stat.length; i++) System.out.println("HERE: " + stat[i].getPath().toString()); + for (int i = 0; i < stat.length; i++) System.out.println("HERE: " + stat[i].getPath().toString()); Assert.assertEquals(4, stat.length); // Find the new delta file and make sure it has the right contents @@ -507,7 +506,7 @@ public void majorTableNoBase() throws Exception { Assert.assertEquals(1, compacts.size()); Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); - // There should still now be 5 directories in the location + // There should now be 3 directories in the location FileSystem fs = FileSystem.get(conf); FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation())); Assert.assertEquals(3, stat.length);