diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index ba2b3f3311..a1c466236f 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -18,8 +18,10 @@ package org.apache.hadoop.hive.ql.parse; import org.apache.commons.io.FileUtils; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; @@ -3036,6 +3038,55 @@ public void testSkipTables() throws IOException { verifyIfTableNotExist(dbName + "_dupe", "acid_table_incremental", metaStoreClientMirror); } + @Test + public void testDeleteStagingDir() throws IOException { + String testName = "deleteStagingDir"; + String dbName = createDB(testName, driver); + String tableName = "unptned"; + run("CREATE TABLE " + dbName + "." + tableName + "(a string) STORED AS TEXTFILE", driver); + + String[] unptn_data = new String[] {"one", "two"}; + String unptn_locn = new Path(TEST_PATH , testName + "_unptn").toUri().getPath(); + createTestDataFile(unptn_locn, unptn_data); + run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned", driver); + verifySetup("SELECT * from " + dbName + ".unptned", unptn_data, driver); + + // Perform repl + advanceDumpDir(); + run("REPL DUMP " + dbName, driver); + String replDumpLocn = getResult(0,0,driver); + // Reset the driver + driverMirror.close(); + driverMirror.init(); + run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'", driverMirror); + // Calling close() explicitly to clean up the staging dirs + driverMirror.close(); + // Check result + Path warehouse = new Path(System.getProperty("test.warehouse.dir", "/tmp")); + FileSystem fs = FileSystem.get(warehouse.toUri(), hconf); + try { + Path path = new Path(warehouse, dbName + "_dupe.db" + Path.SEPARATOR + tableName); + // First check if the table dir exists (could have been deleted for some reason in pre-commit tests) + if (!fs.exists(path)) + { + return; + } + PathFilter filter = new PathFilter() + { + @Override + public boolean accept(Path path) + { + return path.getName().startsWith(HiveConf.getVar(hconf, HiveConf.ConfVars.STAGINGDIR)); + } + }; + FileStatus[] statuses = fs.listStatus(path, filter); + assertEquals(0, statuses.length); + } catch (IOException e) { + LOG.error("Failed to list files in: " + warehouse, e); + assert(false); + } + } + private static String createDB(String name, Driver myDriver) { LOG.info("Testing " + name); String dbName = name + "_" + tid; @@ -3283,6 +3334,15 @@ private void verifyIfPartitionExist(String dbName, String tableName, List getFsScratchDirs() { + return fsScratchDirs; + } public Map getLoadTableOutputMap() { return loadTableOutputMap; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java index 706d0b68be..ca2e99256c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java @@ -19,7 +19,6 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; @@ -42,7 +41,6 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; -import org.apache.hadoop.hive.ql.plan.ImportTableDesc; import org.apache.hadoop.hive.ql.plan.api.StageType; import java.io.Serializable; @@ -209,6 +207,9 @@ a database ( directory ) } this.childTasks = scope.rootTasks; LOG.info("Root Tasks / Total Tasks : {} / {} ", childTasks.size(), loadTaskTracker.numberOfTasks()); + + // Populate the driver context with the scratch dir info from the repl context, so that the temp dirs will be cleaned up later + driverContext.getCtx().getFsScratchDirs().putAll(context.pathInfo.getFsScratchDirs()); } catch (Exception e) { LOG.error("failed replication", e); setException(e); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/PathInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/PathInfo.java index f9f3750090..f42f632b83 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/PathInfo.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/PathInfo.java @@ -44,6 +44,10 @@ public PathInfo(HiveConf hiveConf) { stagingDir = HiveConf.getVar(hiveConf, HiveConf.ConfVars.STAGINGDIR); } + public Map getFsScratchDirs() { + return fsScratchDirs; + } + Path computeStagingDir(Path inputPath) { final URI inputPathUri = inputPath.toUri(); final String inputPathName = inputPathUri.getPath();