diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
index b09e7ae..9597d80 100644
--- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
+++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
@@ -349,7 +349,10 @@ private static void createPartitionIfNotExists(HiveEndPoint ep,
if (ep.partitionVals.isEmpty()) {
return;
}
- SessionState state = SessionState.start(new CliSessionState(conf));
+ SessionState localSession = null;
+ if(SessionState.get() == null) {
+ localSession = SessionState.start(new CliSessionState(conf));
+ }
Driver driver = new Driver(conf);
try {
@@ -378,7 +381,9 @@ private static void createPartitionIfNotExists(HiveEndPoint ep,
} finally {
driver.close();
try {
- state.close();
+ if(localSession != null) {
+ localSession.close();
+ }
} catch (IOException e) {
LOG.warn("Error closing SessionState used to run Hive DDL.");
}
diff --git itests/hive-unit/pom.xml itests/hive-unit/pom.xml
index be8d11f..077631a 100644
--- itests/hive-unit/pom.xml
+++ itests/hive-unit/pom.xml
@@ -53,6 +53,16 @@
hive-exec
${project.version}
+
+ org.apache.hive.hcatalog
+ hive-hcatalog-core
+ ${project.version}
+
+
+ org.apache.hive.hcatalog
+ hive-hcatalog-streaming
+ ${project.version}
+
diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
new file mode 100644
index 0000000..998e69e
--- /dev/null
+++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -0,0 +1,311 @@
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import junit.framework.Assert;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.MetaStoreThread;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.CompactionRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
+import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
+import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hive.hcatalog.common.HCatUtil;
+import org.apache.hive.hcatalog.streaming.DelimitedInputWriter;
+import org.apache.hive.hcatalog.streaming.HiveEndPoint;
+import org.apache.hive.hcatalog.streaming.StreamingConnection;
+import org.apache.hive.hcatalog.streaming.TransactionBatch;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ */
+public class TestCompactor {
+ private static final Logger LOG = LoggerFactory.getLogger(TestCompactor.class);
+ private static final String TEST_DATA_DIR = HCatUtil.makePathASafeFileName(System.getProperty("java.io.tmpdir") +
+ File.separator + TestCompactor.class.getCanonicalName() + "-" + System.currentTimeMillis());
+ private static final String BASIC_FILE_NAME = TEST_DATA_DIR + "/basic.input.data";
+ private static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse";
+
+ @Rule
+ public TemporaryFolder stagingFolder = new TemporaryFolder();
+ private HiveConf conf;
+ IMetaStoreClient msClient;
+ private Driver driver;
+
+ @Before
+ public void setup() throws Exception {
+
+ File f = new File(TEST_WAREHOUSE_DIR);
+ if (f.exists()) {
+ FileUtil.fullyDelete(f);
+ }
+ if(!(new File(TEST_WAREHOUSE_DIR).mkdirs())) {
+ throw new RuntimeException("Could not create " + TEST_WAREHOUSE_DIR);
+ }
+
+ HiveConf hiveConf = new HiveConf(this.getClass());
+ hiveConf.setVar(HiveConf.ConfVars.PREEXECHOOKS, "");
+ hiveConf.setVar(HiveConf.ConfVars.POSTEXECHOOKS, "");
+ hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, TEST_WAREHOUSE_DIR);
+ hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName());
+ //"org.apache.hadoop.hive.ql.io.HiveInputFormat"
+
+ TxnDbUtil.setConfValues(hiveConf);
+ TxnDbUtil.cleanDb();
+ TxnDbUtil.prepDb();
+
+ conf = hiveConf;
+ msClient = new HiveMetaStoreClient(conf);
+ driver = new Driver(hiveConf);
+ SessionState.start(new CliSessionState(hiveConf));
+
+
+ int LOOP_SIZE = 3;
+ String[] input = new String[LOOP_SIZE * LOOP_SIZE];
+ int k = 0;
+ for (int i = 1; i <= LOOP_SIZE; i++) {
+ String si = i + "";
+ for (int j = 1; j <= LOOP_SIZE; j++) {
+ String sj = "S" + j + "S";
+ input[k] = si + "\t" + sj;
+ k++;
+ }
+ }
+ createTestDataFile(BASIC_FILE_NAME, input);
+ }
+ @After
+ public void tearDown() {
+ conf = null;
+ if(msClient != null) {
+ msClient.close();
+ }
+ if(driver != null) {
+ driver.close();
+ }
+ }
+
+ /**
+ * After each major compaction, stats need to be updated on each column of the
+ * table/partition which previously had stats.
+ * 1. create a bucketed ORC backed table (Orc is currently required by ACID)
+ * 2. populate 2 partitions with data
+ * 3. compute stats
+ * 4. insert some data into the table using StreamingAPI
+ * 5. Trigger major compaction (which should update stats)
+ * 6. check that stats have been updated
+ * @throws Exception
+ * todo:
+ * 2. add non-partitioned test
+ * 4. add a test with sorted table?
+ */
+ @Test
+ public void testStatsAfterCompactionPartTbl() throws Exception {
+ //as of (8/27/2014) Hive 0.14, ACID/Orc requires HiveInputFormat
+ String tblName = "compaction_test";
+ String tblNameStg = tblName + "_stg";
+ List colNames = Arrays.asList("a", "b");
+ executeStatementOnDriver("drop table if exists " + tblName, driver);
+ executeStatementOnDriver("drop table if exists " + tblNameStg, driver);
+ executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
+ " PARTITIONED BY(bkt INT)" +
+ " CLUSTERED BY(a) INTO 4 BUCKETS" + //currently ACID requires table to be bucketed
+ " STORED AS ORC", driver);
+ executeStatementOnDriver("CREATE EXTERNAL TABLE " + tblNameStg + "(a INT, b STRING)" +
+ " ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t' LINES TERMINATED BY '\\n'" +
+ " STORED AS TEXTFILE" +
+ " LOCATION '" + stagingFolder.newFolder() + "'", driver);
+
+ executeStatementOnDriver("load data local inpath '" + BASIC_FILE_NAME +
+ "' overwrite into table " + tblNameStg, driver);
+ execSelectAndDumpData("select * from " + tblNameStg, driver, "Dumping data for " +
+ tblNameStg + " after load:");
+ executeStatementOnDriver("FROM " + tblNameStg +
+ " INSERT OVERWRITE TABLE " + tblName + " PARTITION(bkt=0) " +
+ "SELECT a, b where a < 2", driver);
+ executeStatementOnDriver("FROM " + tblNameStg +
+ " INSERT OVERWRITE TABLE " + tblName + " PARTITION(bkt=1) " +
+ "SELECT a, b where a >= 2", driver);
+ execSelectAndDumpData("select * from " + tblName, driver, "Dumping data for " +
+ tblName + " after load:");
+
+ CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+ CompactionInfo ci = new CompactionInfo("default", tblName, "bkt=0", CompactionType.MAJOR);
+ LOG.debug("List of stats columns before analyze Part1: " + txnHandler.findColumnsWithStats(ci));
+ Worker.StatsUpdater su = Worker.StatsUpdater.init(ci, colNames, conf,
+ System.getProperty("user.name"));
+ su.gatherStats();//compute stats before compaction
+ LOG.debug("List of stats columns after analyze Part1: " + txnHandler.findColumnsWithStats(ci));
+
+ CompactionInfo ciPart2 = new CompactionInfo("default", tblName, "bkt=1", CompactionType.MAJOR);
+ LOG.debug("List of stats columns before analyze Part2: " + txnHandler.findColumnsWithStats(ci));
+ su = Worker.StatsUpdater.init(ciPart2, colNames, conf, System.getProperty("user.name"));
+ su.gatherStats();//compute stats before compaction
+ LOG.debug("List of stats columns after analyze Part2: " + txnHandler.findColumnsWithStats(ci));
+
+ //now make sure we get the stats we expect for partition we are going to add data to later
+ Map> stats = msClient.getPartitionColumnStatistics(ci.dbname,
+ ci.tableName, Arrays.asList(ci.partName), colNames);
+ List colStats = stats.get(ci.partName);
+ Assert.assertNotNull("No stats found for partition " + ci.partName, colStats);
+ Assert.assertEquals("Expected column 'a' at index 0", "a", colStats.get(0).getColName());
+ Assert.assertEquals("Expected column 'b' at index 1", "b", colStats.get(1).getColName());
+ LongColumnStatsData colAStats = colStats.get(0).getStatsData().getLongStats();
+ Assert.assertEquals("lowValue a", 1, colAStats.getLowValue());
+ Assert.assertEquals("highValue a", 1, colAStats.getHighValue());
+ Assert.assertEquals("numNulls a", 0, colAStats.getNumNulls());
+ Assert.assertEquals("numNdv a", 1, colAStats.getNumDVs());
+ StringColumnStatsData colBStats = colStats.get(1).getStatsData().getStringStats();
+ Assert.assertEquals("maxColLen b", 3, colBStats.getMaxColLen());
+ Assert.assertEquals("avgColLen b", 3.0, colBStats.getAvgColLen());
+ Assert.assertEquals("numNulls b", 0, colBStats.getNumNulls());
+ Assert.assertEquals("nunDVs", 2, colBStats.getNumDVs());
+
+ //now save stats for partition we won't modify
+ stats = msClient.getPartitionColumnStatistics(ciPart2.dbname,
+ ciPart2.tableName, Arrays.asList(ciPart2.partName), colNames);
+ colStats = stats.get(ciPart2.partName);
+ LongColumnStatsData colAStatsPart2 = colStats.get(0).getStatsData().getLongStats();
+ StringColumnStatsData colBStatsPart2 = colStats.get(1).getStatsData().getStringStats();
+
+
+ HiveEndPoint endPt = new HiveEndPoint(null, ci.dbname, ci.tableName, Arrays.asList("0"));
+ DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"a","b"},",", endPt);
+ /*next call will eventually end up in HiveEndPoint.createPartitionIfNotExists() which
+ makes an operation on Driver
+ * and starts it's own CliSessionState and then closes it, which removes it from ThreadLoacal;
+ * thus the session
+ * created in this class is gone after this; I fixed it in HiveEndPoint*/
+ StreamingConnection connection = endPt.newConnection(true);
+
+ TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer);
+ txnBatch.beginNextTransaction();
+ Assert.assertEquals(TransactionBatch.TxnState.OPEN, txnBatch.getCurrentTransactionState());
+ txnBatch.write("50,Kiev".getBytes());
+ txnBatch.write("51,St. Petersburg".getBytes());
+ txnBatch.write("44,Boston".getBytes());
+ txnBatch.commit();
+
+ txnBatch.beginNextTransaction();
+ txnBatch.write("52,Tel Aviv".getBytes());
+ txnBatch.write("53,Atlantis".getBytes());
+ txnBatch.write("53,Boston".getBytes());
+ txnBatch.commit();
+
+ txnBatch.close();
+ connection.close();
+ execSelectAndDumpData("select * from " + ci.getFullTableName(), driver, ci.getFullTableName());
+
+ //so now we have written some new data to bkt=0 and it shows up
+ CompactionRequest rqst = new CompactionRequest(ci.dbname, ci.tableName, CompactionType.MAJOR);
+ rqst.setPartitionname(ci.partName);
+ txnHandler.compact(rqst);
+ Worker t = new Worker();
+ t.setThreadId((int) t.getId());
+ t.setHiveConf(conf);
+ MetaStoreThread.BooleanPointer stop = new MetaStoreThread.BooleanPointer();
+ stop.boolVal = true;
+ t.init(stop);
+ t.run();
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List compacts = rsp.getCompacts();
+ Assert.assertEquals(1, compacts.size());
+ Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
+
+ stats = msClient.getPartitionColumnStatistics(ci.dbname, ci.tableName,
+ Arrays.asList(ci.partName), colNames);
+ colStats = stats.get(ci.partName);
+ Assert.assertNotNull("No stats found for partition " + ci.partName, colStats);
+ Assert.assertEquals("Expected column 'a' at index 0", "a", colStats.get(0).getColName());
+ Assert.assertEquals("Expected column 'b' at index 1", "b", colStats.get(1).getColName());
+ colAStats = colStats.get(0).getStatsData().getLongStats();
+ Assert.assertEquals("lowValue a", 1, colAStats.getLowValue());
+ Assert.assertEquals("highValue a", 53, colAStats.getHighValue());
+ Assert.assertEquals("numNulls a", 0, colAStats.getNumNulls());
+ Assert.assertEquals("numNdv a", 6, colAStats.getNumDVs());
+ colBStats = colStats.get(1).getStatsData().getStringStats();
+ Assert.assertEquals("maxColLen b", 14, colBStats.getMaxColLen());
+ //cast it to long to get rid of periodic decimal
+ Assert.assertEquals("avgColLen b", (long)6.1111111111, (long)colBStats.getAvgColLen());
+ Assert.assertEquals("numNulls b", 0, colBStats.getNumNulls());
+ Assert.assertEquals("nunDVs", 10, colBStats.getNumDVs());
+
+ //now check that stats for partition we didn't modify did not change
+ stats = msClient.getPartitionColumnStatistics(ciPart2.dbname, ciPart2.tableName,
+ Arrays.asList(ciPart2.partName), colNames);
+ colStats = stats.get(ciPart2.partName);
+ Assert.assertEquals("Expected stats for " + ciPart2.partName + " to stay the same",
+ colAStatsPart2, colStats.get(0).getStatsData().getLongStats());
+ Assert.assertEquals("Expected stats for " + ciPart2.partName + " to stay the same",
+ colBStatsPart2, colStats.get(1).getStatsData().getStringStats());
+ }
+
+ /**
+ * convenience method to execute a select stmt and dump results to log file
+ */
+ private static void execSelectAndDumpData(String selectStmt, Driver driver, String msg)
+ throws Exception {
+ executeStatementOnDriver(selectStmt, driver);
+ ArrayList valuesReadFromHiveDriver = new ArrayList();
+ driver.getResults(valuesReadFromHiveDriver);
+ int rowIdx = 0;
+ LOG.debug(msg);
+ for(String row : valuesReadFromHiveDriver) {
+ LOG.debug(" rowIdx=" + rowIdx++ + ":" + row);
+ }
+ }
+ /**
+ * Execute Hive CLI statement
+ * @param cmd arbitrary statement to execute
+ */
+ static void executeStatementOnDriver(String cmd, Driver driver) throws IOException, CommandNeedRetryException {
+ LOG.debug("Executing: " + cmd);
+ CommandProcessorResponse cpr = driver.run(cmd);
+ if(cpr.getResponseCode() != 0) {
+ throw new IOException("Failed to execute \"" + cmd + "\". Driver returned: " + cpr);
+ }
+ }
+ static void createTestDataFile(String filename, String[] lines) throws IOException {
+ FileWriter writer = null;
+ try {
+ File file = new File(filename);
+ file.deleteOnExit();
+ writer = new FileWriter(file);
+ for (String line : lines) {
+ writer.write(line + "\n");
+ }
+ } finally {
+ if (writer != null) {
+ writer.close();
+ }
+ }
+
+ }
+}
diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
index a818300..d249db0 100644
--- metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
+++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
@@ -34,9 +34,17 @@
private String fullPartitionName = null;
private String fullTableName = null;
+ public CompactionInfo(String dbname, String tableName, String partName, CompactionType type) {
+ this.dbname = dbname;
+ this.tableName = tableName;
+ this.partName = partName;
+ this.type = type;
+ }
+ CompactionInfo() {}
+
public String getFullPartitionName() {
if (fullPartitionName == null) {
- StringBuffer buf = new StringBuffer(dbname);
+ StringBuilder buf = new StringBuilder(dbname);
buf.append('.');
buf.append(tableName);
if (partName != null) {
@@ -50,11 +58,14 @@ public String getFullPartitionName() {
public String getFullTableName() {
if (fullTableName == null) {
- StringBuffer buf = new StringBuffer(dbname);
+ StringBuilder buf = new StringBuilder(dbname);
buf.append('.');
buf.append(tableName);
fullTableName = buf.toString();
}
return fullTableName;
}
+ public boolean isMajorCompaction() {
+ return CompactionType.MAJOR == type;
+ }
}
diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index 524a7a4..ce0937d 100644
--- metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -535,6 +535,46 @@ public void revokeTimedoutWorkers(long timeout) throws MetaException {
deadlockCnt = 0;
}
}
+
+ /**
+ * Queries metastore DB directly to find columns in the table which have statistics information.
+ * If {@code ci} includes partition info then per partition stats info is examined, otherwise
+ * table level stats are examined.
+ * @throws MetaException
+ */
+ public List findColumnsWithStats(CompactionInfo ci) throws MetaException {
+ Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ Statement stmt = null;
+ ResultSet rs = null;
+ try {
+ stmt = dbConn.createStatement();
+ String s = "SELECT COLUMN_NAME FROM " + (ci.partName == null ? "TAB_COL_STATS" : "PART_COL_STATS")
+ + " WHERE DB_NAME='" + ci.dbname + "' AND TABLE_NAME='" + ci.tableName + "'"
+ + (ci.partName == null ? "" : " AND PARTITION_NAME='" + ci.partName + "'");
+ LOG.debug("Going to execute <" + s + ">");
+ rs = stmt.executeQuery(s);
+ List columns = new ArrayList();
+ while(rs.next()) {
+ columns.add(rs.getString(1));
+ }
+ LOG.debug("Going to commit");
+ dbConn.commit();
+ return columns;
+ } catch (SQLException e) {
+ try {
+ LOG.error("Unable to change dead worker's records back to initiated state " +
+ e.getMessage());
+ LOG.debug("Going to rollback");
+ dbConn.rollback();
+ } catch (SQLException e1) {
+ //nothing we can do here
+ }
+ throw new MetaException("Unable to connect to transaction database " +
+ StringUtils.stringifyException(e));
+ } finally {
+ close(rs, stmt, dbConn);
+ }
+ }
}
diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 063dee6..3c6cddd 100644
--- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -859,6 +859,29 @@ protected void closeStmt(Statement stmt) {
}
/**
+ * Close the ResultSet.
+ * @param rs may be {@code null}
+ */
+ void close(ResultSet rs) {
+ try {
+ if (rs != null && !rs.isClosed()) {
+ rs.close();
+ }
+ }
+ catch(SQLException ex) {
+ LOG.warn("Failed to close statement " + ex.getMessage());
+ }
+ }
+
+ /**
+ * Close all 3 JDBC artifacts in order: {@code rs stmt dbConn}
+ */
+ void close(ResultSet rs, Statement stmt, Connection dbConn) {
+ close(rs);
+ closeStmt(stmt);
+ closeDbConn(dbConn);
+ }
+ /**
* Determine if an exception was a deadlock. Unfortunately there is no standard way to do
* this, so we have to inspect the error messages and catch the telltale signs for each
* different database.
diff --git ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorResponse.java ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorResponse.java
index 7c173d3..4584517 100644
--- ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorResponse.java
+++ ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorResponse.java
@@ -76,4 +76,9 @@ public CommandProcessorResponse(int responseCode, String errorMessage, String SQ
public String getSQLState() { return SQLState; }
public Schema getSchema() { return resSchema; }
public Throwable getException() { return exception; }
+ public String toString() {
+ return "(" + responseCode + "," + errorMessage + "," + SQLState +
+ (resSchema == null ? "" : ",") +
+ (exception == null ? "" : exception.getMessage()) + ")";
+ }
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index 1a83c64..d3a1179 100644
--- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -100,7 +100,7 @@ public CompactorMR() {
* @throws java.io.IOException if the job fails
*/
void run(HiveConf conf, String jobName, Table t, StorageDescriptor sd,
- ValidTxnList txns, boolean isMajor) throws IOException {
+ ValidTxnList txns, boolean isMajor, Worker.StatsUpdater su) throws IOException {
JobConf job = new JobConf(conf);
job.setJobName(jobName);
job.setOutputKeyClass(NullWritable.class);
@@ -182,6 +182,7 @@ void run(HiveConf conf, String jobName, Table t, StorageDescriptor sd,
LOG.debug("Setting maximume transaction to " + maxTxn);
JobClient.runJob(job).waitForCompletion();
+ su.gatherStats();
}
/**
diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index f464df8..8e6de3a 100644
--- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -20,20 +20,28 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.common.ValidTxnList;
-import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
import org.apache.hadoop.hive.metastore.txn.TxnHandler;
-import org.apache.hadoop.io.Text;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
+import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.security.PrivilegedExceptionAction;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
/**
* A class to do compactions. This will run in a separate thread. It will spin on the
@@ -110,7 +118,7 @@ public void run() {
continue;
}
- final boolean isMajor = (ci.type == CompactionType.MAJOR);
+ final boolean isMajor = ci.isMajorCompaction();
final ValidTxnList txns =
TxnHandler.createValidTxnList(txnHandler.getOpenTxns());
final StringBuffer jobName = new StringBuffer(name);
@@ -129,17 +137,19 @@ public void run() {
LOG.info("Starting " + ci.type.toString() + " compaction for " +
ci.getFullPartitionName());
+ final StatsUpdater su = StatsUpdater.init(ci, txnHandler.findColumnsWithStats(ci), conf,
+ runJobAsSelf(runAs) ? runAs : t.getOwner());
final CompactorMR mr = new CompactorMR();
try {
if (runJobAsSelf(runAs)) {
- mr.run(conf, jobName.toString(), t, sd, txns, isMajor);
+ mr.run(conf, jobName.toString(), t, sd, txns, isMajor, su);
} else {
UserGroupInformation ugi = UserGroupInformation.createProxyUser(t.getOwner(),
UserGroupInformation.getLoginUser());
ugi.doAs(new PrivilegedExceptionAction