diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 26f5c7e248..165d245659 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -295,7 +295,7 @@ void run(HiveConf conf, String jobName, Table t, StorageDescriptor sd, } /** - * @param baseDir if not null, it's either table/partition root folder or base_xxxx. + * @param baseDir if not null, it's either table/partition root folder or base_xxxx. * If it's base_xxxx, it's in dirsToSearch, else the actual original files * (all leaves recursively) are in the dirsToSearch list */ @@ -915,13 +915,33 @@ public void commitJob(JobContext context) throws IOException { FileSystem fs = tmpLocation.getFileSystem(conf); LOG.debug("Moving contents of " + tmpLocation.toString() + " to " + finalLocation.toString()); - + if(!fs.exists(tmpLocation)) { + /** + * No 'tmpLocation' may happen if job generated created 0 splits, which happens if all + * input delta and/or base files were empty or had + * only {@link org.apache.orc.impl.OrcAcidUtils#getSideFile(Path)} files. + * So make sure the new base/delta is created. + */ + AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf) + .writingBase(conf.getBoolean(IS_MAJOR, false)) + .isCompressed(conf.getBoolean(IS_COMPRESSED, false)) + .minimumTransactionId(conf.getLong(MIN_TXN, Long.MAX_VALUE)) + .maximumTransactionId(conf.getLong(MAX_TXN, Long.MIN_VALUE)) + .bucket(0) + .statementId(-1); + Path emptyFile = new Path(AcidUtils.createFilename(finalLocation, options).getParent(), + ".empty_file");//we only need the directory but there is no such API + LOG.info(context.getJobID() + ": " + tmpLocation + + " not found. Assuming 0 splits. Creating " + emptyFile); + fs.create(emptyFile, false).close(); + return; + } FileStatus[] contents = fs.listStatus(tmpLocation);//expect 1 base or delta dir in this list //we have MIN_TXN, MAX_TXN and IS_MAJOR in JobConf so we could figure out exactly what the dir //name is that we want to rename; leave it for another day - for (int i = 0; i < contents.length; i++) { - Path newPath = new Path(finalLocation, contents[i].getPath().getName()); - fs.rename(contents[i].getPath(), newPath); + for (FileStatus fileStatus : contents) { + Path newPath = new Path(finalLocation, fileStatus.getPath().getName()); + fs.rename(fileStatus.getPath(), newPath); } fs.delete(tmpLocation, true); } diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java index ac447797d5..bbf6c86b1a 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java @@ -17,11 +17,14 @@ */ package org.apache.hadoop.hive.ql; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.ql.io.AcidUtils; @@ -697,9 +700,9 @@ public void testCompactStatsGather() throws Exception { map = hms.getPartitionColumnStatistics("default","T", partNames, colNames); Assert.assertEquals("", 5, map.get(partNames.get(0)).get(0).getStatsData().getLongStats().getHighValue()); } - @Ignore("enable after HIVE-18294") @Test public void testDefault() throws Exception { + hiveConf.set(MetastoreConf.ConfVars.CREATE_TABLES_AS_ACID.getVarname(), "true"); runStatementOnDriver("drop table if exists T"); runStatementOnDriver("create table T (a int, b int) stored as orc"); runStatementOnDriver("insert into T values(1,2),(3,4)"); @@ -711,6 +714,49 @@ public void testDefault() throws Exception { {"{\"transactionid\":15,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000015_0000015_0000/bucket_00000"} }; checkExpected(rs, expected, "insert data"); + } + /** + * see HIVE-18429 + */ + @Test + public void testEmptyCompactionResult() throws Exception { + hiveConf.set(MetastoreConf.ConfVars.CREATE_TABLES_AS_ACID.getVarname(), "true"); + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("create table T (a int, b int) stored as orc"); + int[][] data = {{1,2}, {3,4}}; + runStatementOnDriver("insert into T" + makeValuesClause(data)); + runStatementOnDriver("insert into T" + makeValuesClause(data)); + + //delete the bucket files so now we have empty delta dirs + List rs = runStatementOnDriver("select distinct INPUT__FILE__NAME from T"); + FileSystem fs = FileSystem.get(hiveConf); + for(String path : rs) { + fs.delete(new Path(path), true); + } + runStatementOnDriver("alter table T compact 'major'"); + TestTxnCommands2.runWorker(hiveConf); + + //check status of compaction job + TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); + ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertEquals("Unexpected number of compactions in history", 1, resp.getCompactsSize()); + Assert.assertEquals("Unexpected 0 compaction state", TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(0).getState()); + Assert.assertTrue(resp.getCompacts().get(0).getHadoopJobId().startsWith("job_local")); + + //now run another compaction make sure empty dirs don't cause issues + runStatementOnDriver("insert into T" + makeValuesClause(data)); + runStatementOnDriver("alter table T compact 'major'"); + TestTxnCommands2.runWorker(hiveConf); + + //check status of compaction job + resp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertEquals("Unexpected number of compactions in history", 2, resp.getCompactsSize()); + for(int i = 0; i < 2; i++) { + Assert.assertEquals("Unexpected 0 compaction state", TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(i).getState()); + Assert.assertTrue(resp.getCompacts().get(i).getHadoopJobId().startsWith("job_local")); + } + rs = runStatementOnDriver("select a, b from T order by a, b"); + Assert.assertEquals(stringifyValues(data), rs); } }