diff --git a/ql/src/java/org/apache/hadoop/hive/ql/util/CloseableThreadLocal.java b/ql/src/java/org/apache/hadoop/hive/ql/util/CloseableThreadLocal.java new file mode 100644 index 0000000000..4e018821f5 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/util/CloseableThreadLocal.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.util; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Supplier; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CloseableThreadLocal { + + private static final Logger LOG = LoggerFactory.getLogger(CloseableThreadLocal.class); + + private final ConcurrentHashMap threadLocalMap; + private final Supplier initialValue; + + public CloseableThreadLocal(Supplier initialValue, int poolSize) { + this.initialValue = initialValue; + threadLocalMap = new ConcurrentHashMap<>(poolSize); + } + + public T get() { + return threadLocalMap.computeIfAbsent(Thread.currentThread(), thread -> initialValue.get()); + } + + public void close() { + threadLocalMap.values().forEach(this::closeQuietly); + } + + private void closeQuietly(AutoCloseable autoCloseable) { + try { + autoCloseable.close(); + } catch (Exception e) { + LOG.warn("Error while closing resource.", e); + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/util/HiveStrictManagedMigration.java b/ql/src/java/org/apache/hadoop/hive/ql/util/HiveStrictManagedMigration.java index 80025b7046..4585ed0659 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/util/HiveStrictManagedMigration.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/util/HiveStrictManagedMigration.java @@ -19,10 +19,14 @@ package org.apache.hadoop.hive.ql.util; import java.io.IOException; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.GnuParser; @@ -39,21 +43,17 @@ import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.cli.CommonCliOptions; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.TransactionalValidationListener; +import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.InvalidOperationException; import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.PrincipalType; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.TableValidWriteIds; -import org.apache.hadoop.hive.metastore.api.TxnAbortedException; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; -import org.apache.hadoop.hive.metastore.IMetaStoreClient; -import org.apache.hadoop.hive.metastore.TableType; -import org.apache.hadoop.hive.metastore.TransactionalValidationListener; -import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils; import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.metastore.utils.HiveStrictManagedUtils; @@ -61,8 +61,6 @@ import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.parse.HiveParser.switchDatabaseStatement_return; -import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.ShimLoader; @@ -71,6 +69,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; public class HiveStrictManagedMigration { @@ -85,24 +84,30 @@ MANAGED // Migrate tables as managed transactional tables } - static class RunOptions { - String dbRegex; - String tableRegex; - String oldWarehouseRoot; - TableMigrationOption migrationOption; - boolean shouldModifyManagedTableLocation; - boolean shouldModifyManagedTableOwner; - boolean shouldModifyManagedTablePermissions; - boolean dryRun; - - public RunOptions(String dbRegex, - String tableRegex, - String oldWarehouseRoot, - TableMigrationOption migrationOption, - boolean shouldModifyManagedTableLocation, - boolean shouldModifyManagedTableOwner, - boolean shouldModifyManagedTablePermissions, - boolean dryRun) { + private static class RunOptions { + final String dbRegex; + final String tableRegex; + final String oldWarehouseRoot; + final TableMigrationOption migrationOption; + final boolean shouldModifyManagedTableLocation; + final boolean shouldModifyManagedTableOwner; + final boolean shouldModifyManagedTablePermissions; + final boolean dryRun; + final TableType tableType; + final int databasePoolSize; + final int tablePoolSize; + + RunOptions(String dbRegex, + String tableRegex, + String oldWarehouseRoot, + TableMigrationOption migrationOption, + boolean shouldModifyManagedTableLocation, + boolean shouldModifyManagedTableOwner, + boolean shouldModifyManagedTablePermissions, + boolean dryRun, + TableType tableType, + int databasePoolSize, + int tablePoolSize) { super(); this.dbRegex = dbRegex; this.tableRegex = tableRegex; @@ -112,6 +117,53 @@ public RunOptions(String dbRegex, this.shouldModifyManagedTableOwner = shouldModifyManagedTableOwner; this.shouldModifyManagedTablePermissions = shouldModifyManagedTablePermissions; this.dryRun = dryRun; + this.tableType = tableType; + this.databasePoolSize = databasePoolSize; + this.tablePoolSize = tablePoolSize; + } + + public RunOptions setShouldModifyManagedTableLocation(boolean shouldModifyManagedTableLocation) { + return new RunOptions( + this.dbRegex, + this.tableRegex, + this.oldWarehouseRoot, + this.migrationOption, + shouldModifyManagedTableLocation, + this.shouldModifyManagedTableOwner, + this.shouldModifyManagedTablePermissions, + this.dryRun, + this.tableType, + this.databasePoolSize, + this.tablePoolSize); + } + } + + private static class OwnerPermsOptions { + final String ownerName; + final String groupName; + final FsPermission dirPerms; + final FsPermission filePerms; + + OwnerPermsOptions(String ownerName, String groupName, FsPermission dirPerms, FsPermission filePerms) { + this.ownerName = ownerName; + this.groupName = groupName; + this.dirPerms = dirPerms; + this.filePerms = filePerms; + } + } + + private static class WarehouseRootCheckResult { + final boolean shouldModifyManagedTableLocation; + final Path curWhRootPath; + final HadoopShims.HdfsEncryptionShim encryptionShim; + + WarehouseRootCheckResult( + boolean shouldModifyManagedTableLocation, + Path curWhRootPath, + HadoopShims.HdfsEncryptionShim encryptionShim) { + this.shouldModifyManagedTableLocation = shouldModifyManagedTableLocation; + this.curWhRootPath = curWhRootPath; + this.encryptionShim = encryptionShim; } } @@ -136,7 +188,15 @@ public static void main(String[] args) throws Exception { int rc = 0; HiveStrictManagedMigration migration = null; try { - migration = new HiveStrictManagedMigration(runOptions); + HiveConf conf = hiveConf == null ? new HiveConf() : hiveConf; + WarehouseRootCheckResult warehouseRootCheckResult = checkOldWarehouseRoot(runOptions, conf); + runOptions = runOptions.setShouldModifyManagedTableLocation( + warehouseRootCheckResult.shouldModifyManagedTableLocation); + boolean createExternalDirsForDbs = checkExternalWarehouseDir(conf); + OwnerPermsOptions ownerPermsOptions = checkOwnerPermsOptions(runOptions, conf); + + migration = new HiveStrictManagedMigration( + conf, runOptions, createExternalDirsForDbs, ownerPermsOptions, warehouseRootCheckResult); migration.run(); } catch (Exception err) { LOG.error("Failed with error", err); @@ -148,7 +208,9 @@ public static void main(String[] args) throws Exception { } // TODO: Something is preventing the process from terminating after main(), adding exit() as hacky solution. - System.exit(rc); + if (hiveConf == null) { + System.exit(rc); + } } static Options createOptions() { @@ -156,66 +218,87 @@ static Options createOptions() { // -hiveconf x=y result.addOption(OptionBuilder - .withValueSeparator() - .hasArgs(2) - .withArgName("property=value") - .withLongOpt("hiveconf") - .withDescription("Use value for given property") - .create()); + .withValueSeparator() + .hasArgs(2) + .withArgName("property=value") + .withLongOpt("hiveconf") + .withDescription("Use value for given property") + .create()); result.addOption(OptionBuilder - .withLongOpt("dryRun") - .withDescription("Show what migration actions would be taken without actually running commands") - .create()); + .withLongOpt("dryRun") + .withDescription("Show what migration actions would be taken without actually running commands") + .create()); result.addOption(OptionBuilder - .withLongOpt("dbRegex") - .withDescription("Regular expression to match database names on which this tool will be run") - .hasArg() - .create('d')); + .withLongOpt("dbRegex") + .withDescription("Regular expression to match database names on which this tool will be run") + .hasArg() + .create('d')); result.addOption(OptionBuilder - .withLongOpt("tableRegex") - .withDescription("Regular expression to match table names on which this tool will be run") - .hasArg() - .create('t')); + .withLongOpt("tableRegex") + .withDescription("Regular expression to match table names on which this tool will be run") + .hasArg() + .create('t')); result.addOption(OptionBuilder - .withLongOpt("oldWarehouseRoot") - .withDescription("Location of the previous warehouse root") - .hasArg() - .create()); + .withLongOpt("oldWarehouseRoot") + .withDescription("Location of the previous warehouse root") + .hasArg() + .create()); result.addOption(OptionBuilder - .withLongOpt("migrationOption") - .withDescription("Table migration option (automatic|external|managed|validate|none)") - .hasArg() - .create('m')); + .withLongOpt("migrationOption") + .withDescription("Table migration option (automatic|external|managed|validate|none)") + .hasArg() + .create('m')); result.addOption(OptionBuilder - .withLongOpt("shouldModifyManagedTableLocation") - .withDescription("Whether managed tables should have their data moved from the old warehouse path to the current warehouse path") - .create()); + .withLongOpt("shouldModifyManagedTableLocation") + .withDescription("Whether managed tables should have their data moved from the old warehouse path to the current warehouse path") + .create()); result.addOption(OptionBuilder - .withLongOpt("shouldModifyManagedTableOwner") - .withDescription("Whether managed tables should have their directory owners changed to the hive user") - .create()); + .withLongOpt("shouldModifyManagedTableOwner") + .withDescription("Whether managed tables should have their directory owners changed to the hive user") + .create()); result.addOption(OptionBuilder - .withLongOpt("shouldModifyManagedTablePermissions") - .withDescription("Whether managed tables should have their directory permissions changed to conform to strict managed tables mode") - .create()); + .withLongOpt("shouldModifyManagedTablePermissions") + .withDescription("Whether managed tables should have their directory permissions changed to conform to strict managed tables mode") + .create()); result.addOption(OptionBuilder - .withLongOpt("modifyManagedTables") - .withDescription("This setting enables the shouldModifyManagedTableLocation, shouldModifyManagedTableOwner, shouldModifyManagedTablePermissions options") - .create()); + .withLongOpt("modifyManagedTables") + .withDescription("This setting enables the shouldModifyManagedTableLocation, shouldModifyManagedTableOwner, shouldModifyManagedTablePermissions options") + .create()); result.addOption(OptionBuilder - .withLongOpt("help") - .withDescription("print help message") - .create('h')); + .withLongOpt("help") + .withDescription("print help message") + .create('h')); + + result.addOption(OptionBuilder + .withLongOpt("databasePoolSize") + .withDescription("Number of threads to process databases.") + .hasArg() + .create("dn")); + + result.addOption(OptionBuilder + .withLongOpt("tablePoolSize") + .withDescription("Number of threads to process tables.") + .hasArg() + .create("tn")); + + result.addOption(OptionBuilder + .withLongOpt("tableType") + .withDescription(String.format("Table type to match tables on which this tool will be run. " + + "Possible values: %s Default: all tables", + Arrays.stream(TableType.values()).map(Enum::name).collect(Collectors.joining("|")))) + .hasArg() + .withArgName("table type") + .create("tt")); return result; } @@ -251,6 +334,22 @@ static RunOptions createRunOptions(CommandLine cli) throws Exception { String oldWarehouseRoot = cli.getOptionValue("oldWarehouseRoot"); boolean dryRun = cli.hasOption("dryRun"); + String tableTypeText = cli.getOptionValue("tableType"); + + int defaultPoolSize = Runtime.getRuntime().availableProcessors() / 2; + if (defaultPoolSize < 1) { + defaultPoolSize = 1; + } + + int databasePoolSize = getIntOptionValue(cli, "databasePoolSize", defaultPoolSize); + if (databasePoolSize < 1) { + throw new IllegalArgumentException("Please specify a positive integer option value for databasePoolSize"); + } + int tablePoolSize = getIntOptionValue(cli, "tablePoolSize", defaultPoolSize); + if (tablePoolSize < 1) { + throw new IllegalArgumentException("Please specify a positive integer option value for tablePoolSize"); + } + RunOptions runOpts = new RunOptions( dbRegex, tableRegex, @@ -259,75 +358,142 @@ static RunOptions createRunOptions(CommandLine cli) throws Exception { shouldModifyManagedTableLocation, shouldModifyManagedTableOwner, shouldModifyManagedTablePermissions, - dryRun); + dryRun, + tableTypeText == null ? null : TableType.valueOf(tableTypeText), + databasePoolSize, + tablePoolSize); return runOpts; } - private RunOptions runOptions; - private HiveConf conf; - private HiveMetaStoreClient hms; - private boolean failedValidationChecks; - private boolean failuresEncountered; - private Warehouse wh; - private Warehouse oldWh; - private String ownerName; - private String groupName; - private FsPermission dirPerms; - private FsPermission filePerms; - private boolean createExternalDirsForDbs; - Path curWhRootPath; - private HadoopShims.HdfsEncryptionShim encryptionShim; - - HiveStrictManagedMigration(RunOptions runOptions) { - this.runOptions = runOptions; - this.conf = new HiveConf(); + private static int getIntOptionValue(CommandLine commandLine, String optionName, int defaultValue) { + if (commandLine.hasOption(optionName)) { + try { + return Integer.parseInt(commandLine.getOptionValue(optionName)); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Please specify a positive integer option value for " + optionName, e); + } + } + return defaultValue; } - void run() throws Exception { - wh = new Warehouse(conf); - checkOldWarehouseRoot(); - checkExternalWarehouseDir(); - checkOwnerPermsOptions(); - - hms = new HiveMetaStoreClient(conf);//MetaException - try { - List databases = hms.getAllDatabases();//TException - LOG.info("Found {} databases", databases.size()); - for (String dbName : databases) { - if (dbName.matches(runOptions.dbRegex)) { - try { - processDatabase(dbName); - } catch (Exception err) { - LOG.error("Error processing database " + dbName, err); - failuresEncountered = true; - } + private final HiveConf conf; + private RunOptions runOptions; + private final boolean createExternalDirsForDbs; + private final Path curWhRootPath; + private final HadoopShims.HdfsEncryptionShim encryptionShim; + private final String ownerName; + private final String groupName; + private final FsPermission dirPerms; + private final FsPermission filePerms; + + private CloseableThreadLocal hms; + private ThreadLocal wh; + private ThreadLocal oldWh; + private CloseableThreadLocal hiveUpdater; + + private AtomicBoolean failuresEncountered; + private AtomicBoolean failedValidationChecks; + + HiveStrictManagedMigration(HiveConf conf, RunOptions runOptions, boolean createExternalDirsForDbs, + OwnerPermsOptions ownerPermsOptions, WarehouseRootCheckResult warehouseRootCheckResult) { + this.conf = conf; + this.runOptions = runOptions; + this.createExternalDirsForDbs = createExternalDirsForDbs; + this.ownerName = ownerPermsOptions.ownerName; + this.groupName = ownerPermsOptions.groupName; + this.dirPerms = ownerPermsOptions.dirPerms; + this.filePerms = ownerPermsOptions.filePerms; + this.curWhRootPath = warehouseRootCheckResult.curWhRootPath; + this.encryptionShim = warehouseRootCheckResult.encryptionShim; + + this.hms = new CloseableThreadLocal<>(() -> { + try { + HiveMetaStoreClient hiveMetaStoreClient = new HiveMetaStoreClient(conf); + if (hiveConf != null) { + SessionState ss = SessionState.start(conf); + ss.applyAuthorizationPolicy(); } + return hiveMetaStoreClient; + } catch (Exception e) { + throw new RuntimeException(e); } - LOG.info("Done processing databases."); - } finally { - hms.close(); + }, runOptions.databasePoolSize + runOptions.tablePoolSize); + wh = ThreadLocal.withInitial(() -> { + try { + return new Warehouse(conf); + } catch (MetaException e) { + throw new RuntimeException(e); + } + }); + if (runOptions.shouldModifyManagedTableLocation) { + Configuration oldConf = new Configuration(conf); + HiveConf.setVar(oldConf, HiveConf.ConfVars.METASTOREWAREHOUSE, runOptions.oldWarehouseRoot); + + oldWh = ThreadLocal.withInitial(() -> { + try { + return new Warehouse(oldConf); + } catch (MetaException e) { + throw new RuntimeException(e); + } + }); } + this.hiveUpdater = new CloseableThreadLocal<>(() -> { + try { + return new HiveUpdater(conf, true); + } catch (HiveException e) { + throw new RuntimeException(e); + } + }, runOptions.databasePoolSize + runOptions.tablePoolSize); - if (failuresEncountered) { + this.failuresEncountered = new AtomicBoolean(false); + this.failedValidationChecks = new AtomicBoolean(false); + } + + void run() throws Exception { + + List databases = hms.get().getDatabases(runOptions.dbRegex); //TException + LOG.info("Found {} databases", databases.size()); + ForkJoinPool databasePool = new ForkJoinPool( + runOptions.databasePoolSize, + new NamedForkJoinWorkerThreadFactory("Database-"), + getUncaughtExceptionHandler(), + false); + ForkJoinPool tablePool = new ForkJoinPool( + runOptions.tablePoolSize, + new NamedForkJoinWorkerThreadFactory("Table-"), + getUncaughtExceptionHandler(), + false); + databasePool.submit(() -> databases.parallelStream().forEach(dbName -> processDatabase(dbName, tablePool))).get(); + LOG.info("Done processing databases."); + + if (failuresEncountered.get()) { throw new HiveException("One or more failures encountered during processing."); } - if (failedValidationChecks) { + if (failedValidationChecks.get()) { throw new HiveException("One or more tables failed validation checks for strict managed table mode."); } } - void checkOldWarehouseRoot() throws IOException, MetaException { + private Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() { + return (t, e) -> LOG.error(String.format("Thread %s exited with error", t.getName()), e); + } + + static WarehouseRootCheckResult checkOldWarehouseRoot(RunOptions runOptions, HiveConf conf) throws IOException { + boolean shouldModifyManagedTableLocation = runOptions.shouldModifyManagedTableLocation; + Path curWhRootPath = null; + HadoopShims.HdfsEncryptionShim encryptionShim = null; + if (runOptions.shouldModifyManagedTableLocation) { if (runOptions.oldWarehouseRoot == null) { LOG.info("oldWarehouseRoot is not specified. Disabling shouldModifyManagedTableLocation"); - runOptions.shouldModifyManagedTableLocation = false; + shouldModifyManagedTableLocation = false; } else { String curWarehouseRoot = HiveConf.getVar(conf, HiveConf.ConfVars.METASTOREWAREHOUSE); if (arePathsEqual(conf, runOptions.oldWarehouseRoot, curWarehouseRoot)) { LOG.info("oldWarehouseRoot is the same as the current warehouse root {}." + " Disabling shouldModifyManagedTableLocation", runOptions.oldWarehouseRoot); - runOptions.shouldModifyManagedTableLocation = false; + shouldModifyManagedTableLocation = false; } else { Path oldWhRootPath = new Path(runOptions.oldWarehouseRoot); curWhRootPath = new Path(curWarehouseRoot); @@ -339,18 +505,18 @@ void checkOldWarehouseRoot() throws IOException, MetaException { LOG.info("oldWarehouseRoot {} has a different FS than the current warehouse root {}." + " Disabling shouldModifyManagedTableLocation", runOptions.oldWarehouseRoot, curWarehouseRoot); - runOptions.shouldModifyManagedTableLocation = false; + shouldModifyManagedTableLocation = false; } else { if (!isHdfs(oldWhRootFs)) { LOG.info("Warehouse is using non-HDFS FileSystem {}. Disabling shouldModifyManagedTableLocation", oldWhRootFs.getUri()); - runOptions.shouldModifyManagedTableLocation = false; + shouldModifyManagedTableLocation = false; } else { encryptionShim = ShimLoader.getHadoopShims().createHdfsEncryptionShim(oldWhRootFs, conf); if (!hasEquivalentEncryption(encryptionShim, oldWhRootPath, curWhRootPath)) { LOG.info("oldWarehouseRoot {} and current warehouse root {} have different encryption zones." + " Disabling shouldModifyManagedTableLocation", oldWhRootPath, curWhRootPath); - runOptions.shouldModifyManagedTableLocation = false; + shouldModifyManagedTableLocation = false; } } } @@ -358,14 +524,15 @@ void checkOldWarehouseRoot() throws IOException, MetaException { } } - if (runOptions.shouldModifyManagedTableLocation) { - Configuration oldWhConf = new Configuration(conf); - HiveConf.setVar(oldWhConf, HiveConf.ConfVars.METASTOREWAREHOUSE, runOptions.oldWarehouseRoot); - oldWh = new Warehouse(oldWhConf); - } + return new WarehouseRootCheckResult(shouldModifyManagedTableLocation, curWhRootPath, encryptionShim); } - void checkOwnerPermsOptions() { + static OwnerPermsOptions checkOwnerPermsOptions(RunOptions runOptions, HiveConf conf) { + String ownerName = null; + String groupName = null; + FsPermission dirPerms = null; + FsPermission filePerms = null; + if (runOptions.shouldModifyManagedTableOwner) { ownerName = conf.get("strict.managed.tables.migration.owner", "hive"); groupName = conf.get("strict.managed.tables.migration.group", null); @@ -380,61 +547,69 @@ void checkOwnerPermsOptions() { filePerms = new FsPermission(filePermsString); } } + + return new OwnerPermsOptions(ownerName, groupName, dirPerms, filePerms); } - void checkExternalWarehouseDir() { + static boolean checkExternalWarehouseDir(HiveConf conf) { String externalWarehouseDir = conf.getVar(HiveConf.ConfVars.HIVE_METASTORE_WAREHOUSE_EXTERNAL); - if (externalWarehouseDir != null && !externalWarehouseDir.isEmpty()) { - createExternalDirsForDbs = true; - } + return externalWarehouseDir != null && !externalWarehouseDir.isEmpty(); } - void processDatabase(String dbName) throws IOException, HiveException, MetaException, TException { - LOG.info("Processing database {}", dbName); - Database dbObj = hms.getDatabase(dbName); + void processDatabase(String dbName, ForkJoinPool tablePool) { + try { + LOG.info("Processing database {}", dbName); + Database dbObj = hms.get().getDatabase(dbName); - boolean modifyDefaultManagedLocation = shouldModifyDatabaseLocation(dbObj); - if (modifyDefaultManagedLocation) { - Path newDefaultDbLocation = wh.getDefaultDatabasePath(dbName); + boolean modifyDefaultManagedLocation = shouldModifyDatabaseLocation(dbObj); + if (modifyDefaultManagedLocation) { + Path newDefaultDbLocation = wh.get().getDefaultDatabasePath(dbName); - LOG.info("Changing location of database {} to {}", dbName, newDefaultDbLocation); - if (!runOptions.dryRun) { - FileSystem fs = newDefaultDbLocation.getFileSystem(conf); - FileUtils.mkdir(fs, newDefaultDbLocation, conf); - // Set appropriate owner/perms of the DB dir only, no need to recurse - checkAndSetFileOwnerPermissions(fs, newDefaultDbLocation, - ownerName, groupName, dirPerms, null, runOptions.dryRun, false); + LOG.info("Changing location of database {} to {}", dbName, newDefaultDbLocation); + if (!runOptions.dryRun) { + FileSystem fs = newDefaultDbLocation.getFileSystem(conf); + FileUtils.mkdir(fs, newDefaultDbLocation, conf); + // Set appropriate owner/perms of the DB dir only, no need to recurse + checkAndSetFileOwnerPermissions(fs, newDefaultDbLocation, + ownerName, groupName, dirPerms, null, runOptions.dryRun, false); + } } - } - if (createExternalDirsForDbs) { - createExternalDbDir(dbObj); - } + if (createExternalDirsForDbs) { + createExternalDbDir(dbObj); + } - boolean errorsInThisDb = false; - List tableNames = hms.getTables(dbName, runOptions.tableRegex); - for (String tableName : tableNames) { - // If we did not change the DB location, there is no need to move the table directories. - try { - processTable(dbObj, tableName, modifyDefaultManagedLocation); - } catch (Exception err) { - LOG.error("Error processing table " + getQualifiedName(dbObj.getName(), tableName), err); - failuresEncountered = true; - errorsInThisDb = true; + List tableNames; + if (runOptions.tableType == null) { + tableNames = hms.get().getTables(dbName, runOptions.tableRegex); + LOG.debug("found {} tables in {}", tableNames.size(), dbName); + } else { + tableNames = hms.get().getTables(dbName, runOptions.tableRegex, runOptions.tableType); + LOG.debug("found {} {}s in {}", tableNames.size(), runOptions.tableType.name(), dbName); } - } - // Finally update the DB location. This would prevent subsequent runs of the migration from processing this DB. - if (modifyDefaultManagedLocation) { + boolean errorsInThisDb = !tablePool.submit(() -> tableNames.parallelStream() + .map(tableName -> processTable(dbObj, tableName, modifyDefaultManagedLocation)) + .reduce(true, (aBoolean, aBoolean2) -> aBoolean && aBoolean2)).get(); if (errorsInThisDb) { - LOG.error("Not updating database location for {} since an error was encountered. The migration must be run again for this database.", - dbObj.getName()); - } else { - Path newDefaultDbLocation = wh.getDefaultDatabasePath(dbName); - // dbObj after this call would have the new DB location. - // Keep that in mind if anything below this requires the old DB path. - getHiveUpdater().updateDbLocation(dbObj, newDefaultDbLocation); + failuresEncountered.set(true); + } + + // Finally update the DB location. This would prevent subsequent runs of the migration from processing this DB. + if (modifyDefaultManagedLocation) { + if (errorsInThisDb) { + LOG.error("Not updating database location for {} since an error was encountered. The migration must be run again for this database.", + dbObj.getName()); + } else { + Path newDefaultDbLocation = wh.get().getDefaultDatabasePath(dbName); + // dbObj after this call would have the new DB location. + // Keep that in mind if anything below this requires the old DB path. + hiveUpdater.get().updateDbLocation(dbObj, newDefaultDbLocation); + } } + } catch (Exception ex) { + LOG.error("Error processing database " + dbName, ex); + failuresEncountered.set(true); } } @@ -464,44 +639,55 @@ public static boolean migrateTable(Table tableObj, TableType tableType, TableMig return false; } - void processTable(Database dbObj, String tableName, boolean modifyDefaultManagedLocation) - throws HiveException, IOException, TException { - String dbName = dbObj.getName(); - LOG.debug("Processing table {}", getQualifiedName(dbName, tableName)); + boolean processTable(Database dbObj, String tableName, boolean modifyDefaultManagedLocation) { + try { + String dbName = dbObj.getName(); + LOG.debug("Processing table {}", getQualifiedName(dbName, tableName)); - Table tableObj = hms.getTable(dbName, tableName); - TableType tableType = TableType.valueOf(tableObj.getTableType()); + Table tableObj = hms.get().getTable(dbName, tableName); + TableType tableType = TableType.valueOf(tableObj.getTableType()); - TableMigrationOption migrationOption = runOptions.migrationOption; - if (migrationOption == TableMigrationOption.AUTOMATIC) { - migrationOption = determineMigrationTypeAutomatically(tableObj, tableType, ownerName, conf, hms, null); - } + TableMigrationOption migrationOption = runOptions.migrationOption; + if (migrationOption == TableMigrationOption.AUTOMATIC) { + migrationOption = determineMigrationTypeAutomatically( + tableObj, tableType, ownerName, conf, hms.get(), null); + } - failedValidationChecks = migrateTable(tableObj, tableType, migrationOption, runOptions.dryRun, - getHiveUpdater(), hms, conf); + boolean failedValidationCheck = migrateTable(tableObj, tableType, migrationOption, runOptions.dryRun, + hiveUpdater.get(), hms.get(), conf); - if (!failedValidationChecks && (TableType.valueOf(tableObj.getTableType()) == TableType.MANAGED_TABLE)) { - Path tablePath = new Path(tableObj.getSd().getLocation()); - if (modifyDefaultManagedLocation && shouldModifyTableLocation(dbObj, tableObj)) { - Path newTablePath = wh.getDnsPath( - new Path(wh.getDefaultDatabasePath(dbName), - MetaStoreUtils.encodeTableName(tableName.toLowerCase()))); - moveTableData(dbObj, tableObj, newTablePath); - if (!runOptions.dryRun) { - // File ownership/permission checks should be done on the new table path. - tablePath = newTablePath; - } + if (failedValidationCheck) { + this.failedValidationChecks.set(true); + return true; } - if (runOptions.shouldModifyManagedTableOwner || runOptions.shouldModifyManagedTablePermissions) { - FileSystem fs = tablePath.getFileSystem(conf); - if (isHdfs(fs)) { - // TODO: what about partitions not in the default location? - checkAndSetFileOwnerPermissions(fs, tablePath, - ownerName, groupName, dirPerms, filePerms, runOptions.dryRun, true); + if (TableType.valueOf(tableObj.getTableType()) == TableType.MANAGED_TABLE) { + Path tablePath = new Path(tableObj.getSd().getLocation()); + if (modifyDefaultManagedLocation && shouldModifyTableLocation(dbObj, tableObj)) { + Path newTablePath = wh.get().getDnsPath( + new Path(wh.get().getDefaultDatabasePath(dbName), + MetaStoreUtils.encodeTableName(tableName.toLowerCase()))); + moveTableData(dbObj, tableObj, newTablePath); + if (!runOptions.dryRun) { + // File ownership/permission checks should be done on the new table path. + tablePath = newTablePath; + } + } + + if (runOptions.shouldModifyManagedTableOwner || runOptions.shouldModifyManagedTablePermissions) { + FileSystem fs = tablePath.getFileSystem(conf); + if (isHdfs(fs)) { + // TODO: what about partitions not in the default location? + checkAndSetFileOwnerPermissions(fs, tablePath, + ownerName, groupName, dirPerms, filePerms, runOptions.dryRun, true); + } } } + } catch (Exception ex) { + LOG.error("Error processing table " + getQualifiedName(dbObj.getName(), tableName), ex); + return false; } + return true; } boolean shouldModifyDatabaseLocation(Database dbObj) throws IOException, MetaException { @@ -510,7 +696,7 @@ boolean shouldModifyDatabaseLocation(Database dbObj) throws IOException, MetaExc // Check if the database location is in the default location based on the old warehouse root. // If so then change the database location to the default based on the current warehouse root. String dbLocation = dbObj.getLocationUri(); - Path oldDefaultDbLocation = oldWh.getDefaultDatabasePath(dbName); + Path oldDefaultDbLocation = oldWh.get().getDefaultDatabasePath(dbName); if (arePathsEqual(conf, dbLocation, oldDefaultDbLocation.toString())) { if (hasEquivalentEncryption(encryptionShim, oldDefaultDbLocation, curWhRootPath)) { return true; @@ -529,7 +715,7 @@ boolean shouldModifyTableLocation(Database dbObj, Table tableObj) throws IOExcep // If so then change the table location to the default based on the current warehouse root. // The existing table directory will also be moved to the new default database directory. String tableLocation = tableObj.getSd().getLocation(); - Path oldDefaultTableLocation = oldWh.getDefaultTablePath(dbObj, tableObj.getTableName()); + Path oldDefaultTableLocation = oldWh.get().getDefaultTablePath(dbObj, tableObj.getTableName()); if (arePathsEqual(conf, tableLocation, oldDefaultTableLocation.toString())) { if (hasEquivalentEncryption(encryptionShim, oldDefaultTableLocation, curWhRootPath)) { return true; @@ -545,7 +731,7 @@ boolean shouldModifyPartitionLocation(Database dbObj, Table tableObj, Partition throws IOException, MetaException { String tableName = tableObj.getTableName(); String partLocation = partObj.getSd().getLocation(); - Path oldDefaultPartLocation = oldWh.getDefaultPartitionPath(dbObj, tableObj, partSpec); + Path oldDefaultPartLocation = oldWh.get().getDefaultPartitionPath(dbObj, tableObj, partSpec); if (arePathsEqual(conf, partLocation, oldDefaultPartLocation.toString())) { if (hasEquivalentEncryption(encryptionShim, oldDefaultPartLocation, curWhRootPath)) { return true; @@ -558,7 +744,7 @@ boolean shouldModifyPartitionLocation(Database dbObj, Table tableObj, Partition } void createExternalDbDir(Database dbObj) throws IOException, MetaException { - Path externalTableDbPath = wh.getDefaultExternalDatabasePath(dbObj.getName()); + Path externalTableDbPath = wh.get().getDefaultExternalDatabasePath(dbObj.getName()); FileSystem fs = externalTableDbPath.getFileSystem(conf); if (!fs.exists(externalTableDbPath)) { String dbOwner = ownerName; @@ -621,19 +807,19 @@ void moveTableData(Database dbObj, Table tableObj, Path newTablePath) throws Hiv // locations to be in sync. if (isPartitionedTable(tableObj)) { - List partNames = hms.listPartitionNames(dbName, tableName, Short.MAX_VALUE); + List partNames = hms.get().listPartitionNames(dbName, tableName, Short.MAX_VALUE); // TODO: Fetch partitions in batches? // TODO: Threadpool to process partitions? for (String partName : partNames) { - Partition partObj = hms.getPartition(dbName, tableName, partName); + Partition partObj = hms.get().getPartition(dbName, tableName, partName); Map partSpec = Warehouse.makeSpecFromValues(tableObj.getPartitionKeys(), partObj.getValues()); if (shouldModifyPartitionLocation(dbObj, tableObj, partObj, partSpec)) { // Table directory (which includes the partition directory) has already been moved, // just update the partition location in the metastore. if (!runOptions.dryRun) { - Path newPartPath = wh.getPartitionPath(newTablePath, partSpec); - getHiveUpdater().updatePartitionLocation(dbName, tableObj, partName, partObj, newPartPath); + Path newPartPath = wh.get().getPartitionPath(newTablePath, partSpec); + hiveUpdater.get().updatePartitionLocation(dbName, tableObj, partName, partObj, newPartPath); } } } @@ -642,7 +828,7 @@ void moveTableData(Database dbObj, Table tableObj, Path newTablePath) throws Hiv // Finally update the table location. This would prevent this tool from processing this table again // on subsequent runs of the migration. if (!runOptions.dryRun) { - getHiveUpdater().updateTableLocation(tableObj, newTablePath); + hiveUpdater.get().updateTableLocation(tableObj, newTablePath); } } @@ -907,6 +1093,7 @@ static boolean shouldTablePathBeExternal(Table tableObj, String ownerName, Confi } void cleanup() { + hms.close(); if (hiveUpdater != null) { runAndLogErrors(() -> hiveUpdater.close()); hiveUpdater = null; @@ -917,13 +1104,6 @@ public static HiveUpdater getHiveUpdater(HiveConf conf) throws HiveException { return new HiveUpdater(conf, false); } - HiveUpdater getHiveUpdater() throws HiveException { - if (hiveUpdater == null) { - hiveUpdater = new HiveUpdater(conf, true); - } - return hiveUpdater; - } - private static final class TxnCtx { public final long writeId; public final String validWriteIds; @@ -936,7 +1116,7 @@ public TxnCtx(long writeId, String validWriteIds, long txnId) { } } - public static class HiveUpdater { + private static class HiveUpdater implements AutoCloseable { Hive hive; boolean doFileRename; @@ -946,9 +1126,10 @@ public TxnCtx(long writeId, String validWriteIds, long txnId) { doFileRename = fileRename; } - void close() { + @Override + public void close() { if (hive != null) { - runAndLogErrors(() -> Hive.closeCurrent()); + runAndLogErrors(Hive::closeCurrent); hive = null; } } @@ -1122,8 +1303,6 @@ void updateTableProperties(Table table, Map props) throws HiveEx } } - HiveUpdater hiveUpdater; - interface ThrowableRunnable { void run() throws Exception; } @@ -1157,7 +1336,7 @@ static boolean isPartitionedTable(Table tableObj) { } static boolean isHdfs(FileSystem fs) { - return fs.getScheme().equals("hdfs"); + return scheme.equals(fs.getScheme()); } static String getQualifiedName(Table tableObj) { @@ -1335,4 +1514,12 @@ static boolean hasEquivalentEncryption(HadoopShims.HdfsEncryptionShim encryption } return true; } + + /** + * can set it from tests to test when config needs something other than default values. + */ + @VisibleForTesting + static HiveConf hiveConf = null; + @VisibleForTesting + static String scheme = "hdfs"; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/util/NamedForkJoinWorkerThreadFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/util/NamedForkJoinWorkerThreadFactory.java new file mode 100644 index 0000000000..5b6eecc946 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/util/NamedForkJoinWorkerThreadFactory.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.util; + +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinWorkerThread; + +/** + * This class allows specifying a prefix for ForkJoinPool thread names. + */ +public class NamedForkJoinWorkerThreadFactory implements ForkJoinPool.ForkJoinWorkerThreadFactory { + + NamedForkJoinWorkerThreadFactory(String namePrefix) { + this.namePrefix = namePrefix; + } + + private final String namePrefix; + + @Override + public ForkJoinWorkerThread newThread(ForkJoinPool pool) { + ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool); + worker.setName(namePrefix + worker.getName()); + return worker; + } +} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java index 5f39fdccb5..7039b89089 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java @@ -17,6 +17,13 @@ */ package org.apache.hadoop.hive.ql; +import java.io.File; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.commons.io.FileUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; @@ -40,13 +47,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; - public abstract class TxnCommandsBaseForTests { private static final Logger LOG = LoggerFactory.getLogger(TxnCommandsBaseForTests.class); //bucket count for test tables; set it to 1 for easier debugging @@ -55,7 +55,7 @@ public TestName testName = new TestName(); protected HiveConf hiveConf; Driver d; - enum Table { + public enum Table { ACIDTBL("acidTbl"), ACIDTBLPART("acidTblPart"), ACIDTBL2("acidTbl2"), diff --git a/ql/src/test/org/apache/hadoop/hive/ql/util/CloseableThreadLocalTest.java b/ql/src/test/org/apache/hadoop/hive/ql/util/CloseableThreadLocalTest.java new file mode 100644 index 0000000000..f240ccacd8 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/util/CloseableThreadLocalTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.util; + +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import org.junit.Test; + +public class CloseableThreadLocalTest { + + private static class AutoCloseableStub implements AutoCloseable { + + private boolean closed = false; + + public boolean isClosed() { + return closed; + } + + @Override + public void close() throws Exception { + closed = true; + } + } + + @Test + public void testResourcesAreInitiallyNotClosed() { + CloseableThreadLocal closeableThreadLocal = + new CloseableThreadLocal<>(AutoCloseableStub::new, 1); + + assertThat(closeableThreadLocal.get().isClosed(), is(false)); + } + + @Test + public void testAfterCallingCloseAllInstancesAreClosed() throws ExecutionException, InterruptedException { + CloseableThreadLocal closeableThreadLocal = + new CloseableThreadLocal<>(AutoCloseableStub::new, 2); + + AutoCloseableStub asyncInstance = CompletableFuture.supplyAsync(closeableThreadLocal::get).get(); + AutoCloseableStub syncInstance = closeableThreadLocal.get(); + + closeableThreadLocal.close(); + + assertThat(asyncInstance.isClosed(), is(true)); + assertThat(syncInstance.isClosed(), is(true)); + } + + @Test + public void testSubsequentGetsInTheSameThreadGivesBackTheSameObject() { + CloseableThreadLocal closeableThreadLocal = + new CloseableThreadLocal<>(AutoCloseableStub::new, 2); + + AutoCloseableStub ref1 = closeableThreadLocal.get(); + AutoCloseableStub ref2 = closeableThreadLocal.get(); + assertThat(ref1, is(ref2)); + } + + @Test + public void testDifferentThreadsHasDifferentInstancesOfTheResource() throws ExecutionException, InterruptedException { + CloseableThreadLocal closeableThreadLocal = + new CloseableThreadLocal<>(AutoCloseableStub::new, 2); + + AutoCloseableStub asyncInstance = CompletableFuture.supplyAsync(closeableThreadLocal::get).get(); + AutoCloseableStub syncInstance = closeableThreadLocal.get(); + assertThat(asyncInstance, is(not(syncInstance))); + } +} \ No newline at end of file diff --git a/ql/src/test/org/apache/hadoop/hive/ql/util/TestHiveStrictManagedMigration.java b/ql/src/test/org/apache/hadoop/hive/ql/util/TestHiveStrictManagedMigration.java new file mode 100644 index 0000000000..057135bbcb --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/util/TestHiveStrictManagedMigration.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.util; + +import static org.apache.hadoop.hive.ql.TxnCommandsBaseForTests.Table.ACIDTBL; +import static org.apache.hadoop.hive.ql.TxnCommandsBaseForTests.Table.ACIDTBLPART; +import static org.apache.hadoop.hive.ql.TxnCommandsBaseForTests.Table.NONACIDNONBUCKET; +import static org.apache.hadoop.hive.ql.TxnCommandsBaseForTests.Table.NONACIDORCTBL; +import static org.apache.hadoop.hive.ql.TxnCommandsBaseForTests.Table.NONACIDORCTBL2; + +import java.io.File; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.TxnCommandsBaseForTests; +import org.junit.Assert; +import org.junit.Test; + +public class TestHiveStrictManagedMigration extends TxnCommandsBaseForTests { + private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") + + File.separator + TestHiveStrictManagedMigration.class.getCanonicalName() + "-" + System.currentTimeMillis() + ).getPath().replaceAll("\\\\", "/"); + + @Test + public void testUpgrade() throws Exception { + int[][] data = {{1, 2}, {3, 4}, {5, 6}}; + runStatementOnDriver("DROP TABLE IF EXISTS test.TAcid"); + runStatementOnDriver("DROP DATABASE IF EXISTS test"); + + runStatementOnDriver("CREATE DATABASE test"); + runStatementOnDriver( + "CREATE TABLE test.TAcid (a int, b int) CLUSTERED BY (b) INTO 2 BUCKETS STORED AS orc TBLPROPERTIES" + + " ('transactional'='true')"); + runStatementOnDriver("INSERT INTO test.TAcid" + makeValuesClause(data)); + + runStatementOnDriver( + "CREATE EXTERNAL TABLE texternal (a int, b int)"); + + String oldWarehouse = getWarehouseDir(); + String[] args = {"--hiveconf", "hive.strict.managed.tables=true", "-m", "automatic", "--modifyManagedTables", + "--oldWarehouseRoot", oldWarehouse}; + HiveConf newConf = new HiveConf(hiveConf); + File newWarehouseDir = new File(getTestDataDir(), "newWarehouse"); + newConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, newWarehouseDir.getAbsolutePath()); + newConf.set("strict.managed.tables.migration.owner", System.getProperty("user.name")); + HiveStrictManagedMigration.hiveConf = newConf; + HiveStrictManagedMigration.scheme = "file"; + HiveStrictManagedMigration.main(args); + + Assert.assertTrue(newWarehouseDir.exists()); + Assert.assertTrue(new File(newWarehouseDir, ACIDTBL.toString().toLowerCase()).exists()); + Assert.assertTrue(new File(newWarehouseDir, ACIDTBLPART.toString().toLowerCase()).exists()); + Assert.assertTrue(new File(newWarehouseDir, NONACIDNONBUCKET.toString().toLowerCase()).exists()); + Assert.assertTrue(new File(newWarehouseDir, NONACIDORCTBL.toString().toLowerCase()).exists()); + Assert.assertTrue(new File(newWarehouseDir, NONACIDORCTBL2.toString().toLowerCase()).exists()); + Assert.assertTrue(new File(new File(newWarehouseDir, "test.db"), "tacid").exists()); + Assert.assertTrue(new File(oldWarehouse, "texternal").exists()); + } + + @Override + protected String getTestDataDir() { + return TEST_DATA_DIR; + } +}