diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java index 95fa6641f2..c21e74e377 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java @@ -26,15 +26,7 @@ import java.io.File; import java.io.FileWriter; import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.SortedSet; -import java.util.TreeSet; +import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -1305,6 +1297,57 @@ public void testMinorCompactionForSplitUpdateWithOnlyInserts() throws Exception assertEquals(0, deleteDeltaStat.length); } + @Test + public void testCompactionForFileInSratchDir() throws Exception { + String dbName = "default"; + String tblName = "cfs"; + String columnNamesProperty = "a,b"; + String columnTypesProperty = "int:string"; + String createQuery = "CREATE TABLE " + tblName + "(a INT, b STRING) " + "STORED AS ORC TBLPROPERTIES ('transactional'='true'," + + "'transactional_properties'='default')"; + executeStatementOnDriver("drop table if exists " + tblName, driver); + executeStatementOnDriver(createQuery, driver); + + + + // Insert some data -> this will generate only insert deltas + executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) VALUES(1, 'foo')", driver); + + // Insert some data -> this will again generate only insert deltas + executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) VALUES(2, 'bar')", driver); + + // Find the location of the table + IMetaStoreClient msClient = new HiveMetaStoreClient(conf); + Table table = msClient.getTable(dbName, tblName); + FileSystem fs = FileSystem.get(conf); + + Map tblProperties = new HashMap<>(); + tblProperties.put("compactor.hive.compactor.input.tmp.dir",table.getSd().getLocation() + "/" + "_tmp"); + + //Create empty file in ScratchDir under table location + String scratchDirPath = table.getSd().getLocation() + "/" + "_tmp"; + Path dir = new Path(scratchDirPath + "/base_0000002_v0000005"); + fs.mkdirs(dir); + Path emptyFile = AcidUtils.createBucketFile(dir, 0); + fs.create(emptyFile); + + //Run MajorCompaction + TxnStore txnHandler = TxnUtils.getTxnStore(conf); + Worker t = new Worker(); + t.setThreadId((int) t.getId()); + t.setConf(conf); + t.init(new AtomicBoolean(true), new AtomicBoolean()); + CompactionRequest Cr = new CompactionRequest(dbName, tblName, CompactionType.MAJOR); + Cr.setProperties(tblProperties); + txnHandler.compact(Cr); + t.run(); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertEquals(1, rsp.getCompacts().size()); + Assert.assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().get(0).getState()); + + } + @Test public void minorCompactWhileStreamingWithSplitUpdate() throws Exception { minorCompactWhileStreamingWithSplitUpdate(false); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 543ec0b991..f26d4bb2c3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -839,7 +839,18 @@ private void getWriter(Reporter reporter, ObjectInspector inspector, AcidOutputFormat aof = instantiate(AcidOutputFormat.class, jobConf.get(OUTPUT_FORMAT_CLASS_NAME)); - writer = aof.getRawRecordWriter(new Path(jobConf.get(TMP_LOCATION)), options); + Path rootDir = new Path(jobConf.get(TMP_LOCATION)); + cleanupTmpLocationOnTaskRetry(options, rootDir); + writer = aof.getRawRecordWriter(rootDir, options); + } + } + + private void cleanupTmpLocationOnTaskRetry(AcidOutputFormat.Options options, Path rootDir) throws IOException { + Path tmpLocation = AcidUtils.createFilename(rootDir, options); + FileSystem fs = tmpLocation.getFileSystem(jobConf); + + if (fs.exists(tmpLocation)) { + fs.delete(tmpLocation, true); } }