diff --git a/ql/src/java/org/apache/hadoop/hive/ql/util/CloseableThreadLocal.java b/ql/src/java/org/apache/hadoop/hive/ql/util/CloseableThreadLocal.java new file mode 100644 index 0000000000..4e018821f5 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/util/CloseableThreadLocal.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.util; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Supplier; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CloseableThreadLocal { + + private static final Logger LOG = LoggerFactory.getLogger(CloseableThreadLocal.class); + + private final ConcurrentHashMap threadLocalMap; + private final Supplier initialValue; + + public CloseableThreadLocal(Supplier initialValue, int poolSize) { + this.initialValue = initialValue; + threadLocalMap = new ConcurrentHashMap<>(poolSize); + } + + public T get() { + return threadLocalMap.computeIfAbsent(Thread.currentThread(), thread -> initialValue.get()); + } + + public void close() { + threadLocalMap.values().forEach(this::closeQuietly); + } + + private void closeQuietly(AutoCloseable autoCloseable) { + try { + autoCloseable.close(); + } catch (Exception e) { + LOG.warn("Error while closing resource.", e); + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/util/HiveStrictManagedMigration.java b/ql/src/java/org/apache/hadoop/hive/ql/util/HiveStrictManagedMigration.java index 80025b7046..3e58c592bb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/util/HiveStrictManagedMigration.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/util/HiveStrictManagedMigration.java @@ -23,6 +23,8 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.GnuParser; @@ -39,21 +41,17 @@ import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.cli.CommonCliOptions; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.TransactionalValidationListener; +import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.InvalidOperationException; import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.PrincipalType; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.TableValidWriteIds; -import org.apache.hadoop.hive.metastore.api.TxnAbortedException; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; -import org.apache.hadoop.hive.metastore.IMetaStoreClient; -import org.apache.hadoop.hive.metastore.TableType; -import org.apache.hadoop.hive.metastore.TransactionalValidationListener; -import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils; import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.metastore.utils.HiveStrictManagedUtils; @@ -61,9 +59,6 @@ import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.parse.HiveParser.switchDatabaseStatement_return; -import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; -import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.security.UserGroupInformation; @@ -85,15 +80,15 @@ MANAGED // Migrate tables as managed transactional tables } - static class RunOptions { - String dbRegex; - String tableRegex; - String oldWarehouseRoot; - TableMigrationOption migrationOption; - boolean shouldModifyManagedTableLocation; - boolean shouldModifyManagedTableOwner; - boolean shouldModifyManagedTablePermissions; - boolean dryRun; + private static class RunOptions { + final String dbRegex; + final String tableRegex; + final String oldWarehouseRoot; + final TableMigrationOption migrationOption; + final boolean shouldModifyManagedTableLocation; + final boolean shouldModifyManagedTableOwner; + final boolean shouldModifyManagedTablePermissions; + final boolean dryRun; public RunOptions(String dbRegex, String tableRegex, @@ -113,6 +108,44 @@ public RunOptions(String dbRegex, this.shouldModifyManagedTablePermissions = shouldModifyManagedTablePermissions; this.dryRun = dryRun; } + + public RunOptions setShouldModifyManagedTableLocation(boolean shouldModifyManagedTableLocation) { + return new RunOptions( + this.dbRegex, + this.tableRegex, + this.oldWarehouseRoot, + this.migrationOption, + shouldModifyManagedTableLocation, + this.shouldModifyManagedTableOwner, + this.shouldModifyManagedTablePermissions, + this.dryRun); + } + } + + private static class OwnerPermsOptions { + final String ownerName; + final String groupName; + final FsPermission dirPerms; + final FsPermission filePerms; + + public OwnerPermsOptions(String ownerName, String groupName, FsPermission dirPerms, FsPermission filePerms) { + this.ownerName = ownerName; + this.groupName = groupName; + this.dirPerms = dirPerms; + this.filePerms = filePerms; + } + } + + private static class WarehouseRootCheckResult { + final boolean shouldModifyManagedTableLocation; + final Path curWhRootPath; + final HadoopShims.HdfsEncryptionShim encryptionShim; + + public WarehouseRootCheckResult(boolean shouldModifyManagedTableLocation, Path curWhRootPath, HadoopShims.HdfsEncryptionShim encryptionShim) { + this.shouldModifyManagedTableLocation = shouldModifyManagedTableLocation; + this.curWhRootPath = curWhRootPath; + this.encryptionShim = encryptionShim; + } } public static void main(String[] args) throws Exception { @@ -136,7 +169,15 @@ public static void main(String[] args) throws Exception { int rc = 0; HiveStrictManagedMigration migration = null; try { - migration = new HiveStrictManagedMigration(runOptions); + HiveConf conf = new HiveConf(); + WarehouseRootCheckResult warehouseRootCheckResult = checkOldWarehouseRoot(runOptions, conf); + runOptions = runOptions.setShouldModifyManagedTableLocation( + warehouseRootCheckResult.shouldModifyManagedTableLocation); + boolean createExternalDirsForDbs = checkExternalWarehouseDir(conf); + OwnerPermsOptions ownerPermsOptions = checkOwnerPermsOptions(runOptions, conf); + + migration = new HiveStrictManagedMigration( + conf, runOptions, createExternalDirsForDbs, ownerPermsOptions, warehouseRootCheckResult); migration.run(); } catch (Exception err) { LOG.error("Failed with error", err); @@ -263,71 +304,107 @@ static RunOptions createRunOptions(CommandLine cli) throws Exception { return runOpts; } + private final HiveConf conf; private RunOptions runOptions; - private HiveConf conf; - private HiveMetaStoreClient hms; - private boolean failedValidationChecks; - private boolean failuresEncountered; - private Warehouse wh; - private Warehouse oldWh; - private String ownerName; - private String groupName; - private FsPermission dirPerms; - private FsPermission filePerms; - private boolean createExternalDirsForDbs; - Path curWhRootPath; - private HadoopShims.HdfsEncryptionShim encryptionShim; - - HiveStrictManagedMigration(RunOptions runOptions) { + private final boolean createExternalDirsForDbs; + private final Path curWhRootPath; + private final HadoopShims.HdfsEncryptionShim encryptionShim; + private final String ownerName; + private final String groupName; + private final FsPermission dirPerms; + private final FsPermission filePerms; + + private CloseableThreadLocal hms; + private ThreadLocal wh; + private ThreadLocal oldWh; + private CloseableThreadLocal hiveUpdater; + + private AtomicBoolean failuresEncountered; + private AtomicBoolean failedValidationChecks; + + HiveStrictManagedMigration(HiveConf conf, RunOptions runOptions, boolean createExternalDirsForDbs, + OwnerPermsOptions ownerPermsOptions, WarehouseRootCheckResult warehouseRootCheckResult) { + this.conf = conf; this.runOptions = runOptions; - this.conf = new HiveConf(); + this.createExternalDirsForDbs = createExternalDirsForDbs; + this.ownerName = ownerPermsOptions.ownerName; + this.groupName = ownerPermsOptions.groupName; + this.dirPerms = ownerPermsOptions.dirPerms; + this.filePerms = ownerPermsOptions.filePerms; + this.curWhRootPath = warehouseRootCheckResult.curWhRootPath; + this.encryptionShim = warehouseRootCheckResult.encryptionShim; + + this.hms = new CloseableThreadLocal<>(() -> { + try { + return new HiveMetaStoreClient(conf); + } catch (MetaException e) { + throw new RuntimeException(e); + } + }, 4); + wh = ThreadLocal.withInitial(() -> { + try { + return new Warehouse(conf); + } catch (MetaException e) { + throw new RuntimeException(e); + } + }); + if (runOptions.shouldModifyManagedTableLocation) { + Configuration oldConf = new Configuration(conf); + HiveConf.setVar(oldConf, HiveConf.ConfVars.METASTOREWAREHOUSE, runOptions.oldWarehouseRoot); + + oldWh = ThreadLocal.withInitial(() -> { + try { + return new Warehouse(oldConf); + } catch (MetaException e) { + throw new RuntimeException(e); + } + }); + } + this.hiveUpdater = new CloseableThreadLocal<>(() -> { + try { + return new HiveUpdater(conf, true); + } catch (HiveException e) { + throw new RuntimeException(e); + } + }, 4); + + this.failuresEncountered = new AtomicBoolean(false); + this.failedValidationChecks = new AtomicBoolean(false); } void run() throws Exception { - wh = new Warehouse(conf); - checkOldWarehouseRoot(); - checkExternalWarehouseDir(); - checkOwnerPermsOptions(); - hms = new HiveMetaStoreClient(conf);//MetaException - try { - List databases = hms.getAllDatabases();//TException - LOG.info("Found {} databases", databases.size()); - for (String dbName : databases) { - if (dbName.matches(runOptions.dbRegex)) { - try { - processDatabase(dbName); - } catch (Exception err) { - LOG.error("Error processing database " + dbName, err); - failuresEncountered = true; - } - } - } - LOG.info("Done processing databases."); - } finally { - hms.close(); - } + List databases = hms.get().getAllDatabases();//TException + LOG.info("Found {} databases", databases.size()); + ForkJoinPool databasePool = new ForkJoinPool(4); + ForkJoinPool tablePool = new ForkJoinPool(4); + databasePool.submit(() -> databases.parallelStream().forEach(dbName -> processDatabase(dbName, tablePool))).get(); + LOG.info("Done processing databases."); - if (failuresEncountered) { + if (failuresEncountered.get()) { throw new HiveException("One or more failures encountered during processing."); } - if (failedValidationChecks) { + if (failedValidationChecks.get()) { throw new HiveException("One or more tables failed validation checks for strict managed table mode."); } } - void checkOldWarehouseRoot() throws IOException, MetaException { + static WarehouseRootCheckResult checkOldWarehouseRoot(RunOptions runOptions, HiveConf conf) throws IOException { + boolean shouldModifyManagedTableLocation = runOptions.shouldModifyManagedTableLocation; + Path curWhRootPath = null; + HadoopShims.HdfsEncryptionShim encryptionShim = null; + if (runOptions.shouldModifyManagedTableLocation) { if (runOptions.oldWarehouseRoot == null) { LOG.info("oldWarehouseRoot is not specified. Disabling shouldModifyManagedTableLocation"); - runOptions.shouldModifyManagedTableLocation = false; + shouldModifyManagedTableLocation = false; } else { String curWarehouseRoot = HiveConf.getVar(conf, HiveConf.ConfVars.METASTOREWAREHOUSE); if (arePathsEqual(conf, runOptions.oldWarehouseRoot, curWarehouseRoot)) { LOG.info("oldWarehouseRoot is the same as the current warehouse root {}." + " Disabling shouldModifyManagedTableLocation", runOptions.oldWarehouseRoot); - runOptions.shouldModifyManagedTableLocation = false; + shouldModifyManagedTableLocation = false; } else { Path oldWhRootPath = new Path(runOptions.oldWarehouseRoot); curWhRootPath = new Path(curWarehouseRoot); @@ -339,18 +416,18 @@ void checkOldWarehouseRoot() throws IOException, MetaException { LOG.info("oldWarehouseRoot {} has a different FS than the current warehouse root {}." + " Disabling shouldModifyManagedTableLocation", runOptions.oldWarehouseRoot, curWarehouseRoot); - runOptions.shouldModifyManagedTableLocation = false; + shouldModifyManagedTableLocation = false; } else { if (!isHdfs(oldWhRootFs)) { LOG.info("Warehouse is using non-HDFS FileSystem {}. Disabling shouldModifyManagedTableLocation", oldWhRootFs.getUri()); - runOptions.shouldModifyManagedTableLocation = false; + shouldModifyManagedTableLocation = false; } else { encryptionShim = ShimLoader.getHadoopShims().createHdfsEncryptionShim(oldWhRootFs, conf); if (!hasEquivalentEncryption(encryptionShim, oldWhRootPath, curWhRootPath)) { LOG.info("oldWarehouseRoot {} and current warehouse root {} have different encryption zones." + " Disabling shouldModifyManagedTableLocation", oldWhRootPath, curWhRootPath); - runOptions.shouldModifyManagedTableLocation = false; + shouldModifyManagedTableLocation = false; } } } @@ -358,14 +435,15 @@ void checkOldWarehouseRoot() throws IOException, MetaException { } } - if (runOptions.shouldModifyManagedTableLocation) { - Configuration oldWhConf = new Configuration(conf); - HiveConf.setVar(oldWhConf, HiveConf.ConfVars.METASTOREWAREHOUSE, runOptions.oldWarehouseRoot); - oldWh = new Warehouse(oldWhConf); - } + return new WarehouseRootCheckResult(shouldModifyManagedTableLocation, curWhRootPath, encryptionShim); } - void checkOwnerPermsOptions() { + static OwnerPermsOptions checkOwnerPermsOptions(RunOptions runOptions, HiveConf conf) { + String ownerName = null; + String groupName = null; + FsPermission dirPerms = null; + FsPermission filePerms = null; + if (runOptions.shouldModifyManagedTableOwner) { ownerName = conf.get("strict.managed.tables.migration.owner", "hive"); groupName = conf.get("strict.managed.tables.migration.group", null); @@ -380,61 +458,61 @@ void checkOwnerPermsOptions() { filePerms = new FsPermission(filePermsString); } } + + return new OwnerPermsOptions(ownerName, groupName, dirPerms, filePerms); } - void checkExternalWarehouseDir() { + static boolean checkExternalWarehouseDir(HiveConf conf) { String externalWarehouseDir = conf.getVar(HiveConf.ConfVars.HIVE_METASTORE_WAREHOUSE_EXTERNAL); - if (externalWarehouseDir != null && !externalWarehouseDir.isEmpty()) { - createExternalDirsForDbs = true; - } + return externalWarehouseDir != null && !externalWarehouseDir.isEmpty(); } - void processDatabase(String dbName) throws IOException, HiveException, MetaException, TException { - LOG.info("Processing database {}", dbName); - Database dbObj = hms.getDatabase(dbName); + void processDatabase(String dbName, ForkJoinPool tablePool) { + try { + LOG.info("Processing database {}", dbName); + Database dbObj = hms.get().getDatabase(dbName); - boolean modifyDefaultManagedLocation = shouldModifyDatabaseLocation(dbObj); - if (modifyDefaultManagedLocation) { - Path newDefaultDbLocation = wh.getDefaultDatabasePath(dbName); + boolean modifyDefaultManagedLocation = shouldModifyDatabaseLocation(dbObj); + if (modifyDefaultManagedLocation) { + Path newDefaultDbLocation = wh.get().getDefaultDatabasePath(dbName); - LOG.info("Changing location of database {} to {}", dbName, newDefaultDbLocation); - if (!runOptions.dryRun) { - FileSystem fs = newDefaultDbLocation.getFileSystem(conf); - FileUtils.mkdir(fs, newDefaultDbLocation, conf); - // Set appropriate owner/perms of the DB dir only, no need to recurse - checkAndSetFileOwnerPermissions(fs, newDefaultDbLocation, - ownerName, groupName, dirPerms, null, runOptions.dryRun, false); + LOG.info("Changing location of database {} to {}", dbName, newDefaultDbLocation); + if (!runOptions.dryRun) { + FileSystem fs = newDefaultDbLocation.getFileSystem(conf); + FileUtils.mkdir(fs, newDefaultDbLocation, conf); + // Set appropriate owner/perms of the DB dir only, no need to recurse + checkAndSetFileOwnerPermissions(fs, newDefaultDbLocation, + ownerName, groupName, dirPerms, null, runOptions.dryRun, false); + } } - } - - if (createExternalDirsForDbs) { - createExternalDbDir(dbObj); - } - boolean errorsInThisDb = false; - List tableNames = hms.getTables(dbName, runOptions.tableRegex); - for (String tableName : tableNames) { - // If we did not change the DB location, there is no need to move the table directories. - try { - processTable(dbObj, tableName, modifyDefaultManagedLocation); - } catch (Exception err) { - LOG.error("Error processing table " + getQualifiedName(dbObj.getName(), tableName), err); - failuresEncountered = true; - errorsInThisDb = true; + if (createExternalDirsForDbs) { + createExternalDbDir(dbObj); } - } - // Finally update the DB location. This would prevent subsequent runs of the migration from processing this DB. - if (modifyDefaultManagedLocation) { + List tableNames = hms.get().getTables(dbName, runOptions.tableRegex); + boolean errorsInThisDb = !tablePool.submit(() -> tableNames.parallelStream() + .map(tableName -> processTable(dbObj, tableName, modifyDefaultManagedLocation)) + .reduce(true, (aBoolean, aBoolean2) -> aBoolean && aBoolean2)).get(); if (errorsInThisDb) { - LOG.error("Not updating database location for {} since an error was encountered. The migration must be run again for this database.", - dbObj.getName()); - } else { - Path newDefaultDbLocation = wh.getDefaultDatabasePath(dbName); - // dbObj after this call would have the new DB location. - // Keep that in mind if anything below this requires the old DB path. - getHiveUpdater().updateDbLocation(dbObj, newDefaultDbLocation); + failuresEncountered.set(true); + } + + // Finally update the DB location. This would prevent subsequent runs of the migration from processing this DB. + if (modifyDefaultManagedLocation) { + if (errorsInThisDb) { + LOG.error("Not updating database location for {} since an error was encountered. The migration must be run again for this database.", + dbObj.getName()); + } else { + Path newDefaultDbLocation = wh.get().getDefaultDatabasePath(dbName); + // dbObj after this call would have the new DB location. + // Keep that in mind if anything below this requires the old DB path. + hiveUpdater.get().updateDbLocation(dbObj, newDefaultDbLocation); + } } + } catch (Exception ex) { + LOG.error("Error processing database " + dbName, ex); + failuresEncountered.set(true); } } @@ -464,44 +542,55 @@ public static boolean migrateTable(Table tableObj, TableType tableType, TableMig return false; } - void processTable(Database dbObj, String tableName, boolean modifyDefaultManagedLocation) - throws HiveException, IOException, TException { - String dbName = dbObj.getName(); - LOG.debug("Processing table {}", getQualifiedName(dbName, tableName)); + boolean processTable(Database dbObj, String tableName, boolean modifyDefaultManagedLocation) { + try { + String dbName = dbObj.getName(); + LOG.debug("Processing table {}", getQualifiedName(dbName, tableName)); - Table tableObj = hms.getTable(dbName, tableName); - TableType tableType = TableType.valueOf(tableObj.getTableType()); + Table tableObj = hms.get().getTable(dbName, tableName); + TableType tableType = TableType.valueOf(tableObj.getTableType()); - TableMigrationOption migrationOption = runOptions.migrationOption; - if (migrationOption == TableMigrationOption.AUTOMATIC) { - migrationOption = determineMigrationTypeAutomatically(tableObj, tableType, ownerName, conf, hms, null); - } + TableMigrationOption migrationOption = runOptions.migrationOption; + if (migrationOption == TableMigrationOption.AUTOMATIC) { + migrationOption = determineMigrationTypeAutomatically( + tableObj, tableType, ownerName, conf, hms.get(), null); + } - failedValidationChecks = migrateTable(tableObj, tableType, migrationOption, runOptions.dryRun, - getHiveUpdater(), hms, conf); + boolean failedValidationChecks = migrateTable(tableObj, tableType, migrationOption, runOptions.dryRun, + hiveUpdater.get(), hms.get(), conf); - if (!failedValidationChecks && (TableType.valueOf(tableObj.getTableType()) == TableType.MANAGED_TABLE)) { - Path tablePath = new Path(tableObj.getSd().getLocation()); - if (modifyDefaultManagedLocation && shouldModifyTableLocation(dbObj, tableObj)) { - Path newTablePath = wh.getDnsPath( - new Path(wh.getDefaultDatabasePath(dbName), - MetaStoreUtils.encodeTableName(tableName.toLowerCase()))); - moveTableData(dbObj, tableObj, newTablePath); - if (!runOptions.dryRun) { - // File ownership/permission checks should be done on the new table path. - tablePath = newTablePath; - } + if (failedValidationChecks) { + this.failedValidationChecks.set(true); + return true; } - if (runOptions.shouldModifyManagedTableOwner || runOptions.shouldModifyManagedTablePermissions) { - FileSystem fs = tablePath.getFileSystem(conf); - if (isHdfs(fs)) { - // TODO: what about partitions not in the default location? - checkAndSetFileOwnerPermissions(fs, tablePath, - ownerName, groupName, dirPerms, filePerms, runOptions.dryRun, true); + if (TableType.valueOf(tableObj.getTableType()) == TableType.MANAGED_TABLE) { + Path tablePath = new Path(tableObj.getSd().getLocation()); + if (modifyDefaultManagedLocation && shouldModifyTableLocation(dbObj, tableObj)) { + Path newTablePath = wh.get().getDnsPath( + new Path(wh.get().getDefaultDatabasePath(dbName), + MetaStoreUtils.encodeTableName(tableName.toLowerCase()))); + moveTableData(dbObj, tableObj, newTablePath); + if (!runOptions.dryRun) { + // File ownership/permission checks should be done on the new table path. + tablePath = newTablePath; + } + } + + if (runOptions.shouldModifyManagedTableOwner || runOptions.shouldModifyManagedTablePermissions) { + FileSystem fs = tablePath.getFileSystem(conf); + if (isHdfs(fs)) { + // TODO: what about partitions not in the default location? + checkAndSetFileOwnerPermissions(fs, tablePath, + ownerName, groupName, dirPerms, filePerms, runOptions.dryRun, true); + } } } + } catch (Exception ex) { + LOG.error("Error processing table " + getQualifiedName(dbObj.getName(), tableName), ex); + return false; } + return true; } boolean shouldModifyDatabaseLocation(Database dbObj) throws IOException, MetaException { @@ -510,7 +599,7 @@ boolean shouldModifyDatabaseLocation(Database dbObj) throws IOException, MetaExc // Check if the database location is in the default location based on the old warehouse root. // If so then change the database location to the default based on the current warehouse root. String dbLocation = dbObj.getLocationUri(); - Path oldDefaultDbLocation = oldWh.getDefaultDatabasePath(dbName); + Path oldDefaultDbLocation = oldWh.get().getDefaultDatabasePath(dbName); if (arePathsEqual(conf, dbLocation, oldDefaultDbLocation.toString())) { if (hasEquivalentEncryption(encryptionShim, oldDefaultDbLocation, curWhRootPath)) { return true; @@ -529,7 +618,7 @@ boolean shouldModifyTableLocation(Database dbObj, Table tableObj) throws IOExcep // If so then change the table location to the default based on the current warehouse root. // The existing table directory will also be moved to the new default database directory. String tableLocation = tableObj.getSd().getLocation(); - Path oldDefaultTableLocation = oldWh.getDefaultTablePath(dbObj, tableObj.getTableName()); + Path oldDefaultTableLocation = oldWh.get().getDefaultTablePath(dbObj, tableObj.getTableName()); if (arePathsEqual(conf, tableLocation, oldDefaultTableLocation.toString())) { if (hasEquivalentEncryption(encryptionShim, oldDefaultTableLocation, curWhRootPath)) { return true; @@ -545,7 +634,7 @@ boolean shouldModifyPartitionLocation(Database dbObj, Table tableObj, Partition throws IOException, MetaException { String tableName = tableObj.getTableName(); String partLocation = partObj.getSd().getLocation(); - Path oldDefaultPartLocation = oldWh.getDefaultPartitionPath(dbObj, tableObj, partSpec); + Path oldDefaultPartLocation = oldWh.get().getDefaultPartitionPath(dbObj, tableObj, partSpec); if (arePathsEqual(conf, partLocation, oldDefaultPartLocation.toString())) { if (hasEquivalentEncryption(encryptionShim, oldDefaultPartLocation, curWhRootPath)) { return true; @@ -558,7 +647,7 @@ boolean shouldModifyPartitionLocation(Database dbObj, Table tableObj, Partition } void createExternalDbDir(Database dbObj) throws IOException, MetaException { - Path externalTableDbPath = wh.getDefaultExternalDatabasePath(dbObj.getName()); + Path externalTableDbPath = wh.get().getDefaultExternalDatabasePath(dbObj.getName()); FileSystem fs = externalTableDbPath.getFileSystem(conf); if (!fs.exists(externalTableDbPath)) { String dbOwner = ownerName; @@ -621,19 +710,19 @@ void moveTableData(Database dbObj, Table tableObj, Path newTablePath) throws Hiv // locations to be in sync. if (isPartitionedTable(tableObj)) { - List partNames = hms.listPartitionNames(dbName, tableName, Short.MAX_VALUE); + List partNames = hms.get().listPartitionNames(dbName, tableName, Short.MAX_VALUE); // TODO: Fetch partitions in batches? // TODO: Threadpool to process partitions? for (String partName : partNames) { - Partition partObj = hms.getPartition(dbName, tableName, partName); + Partition partObj = hms.get().getPartition(dbName, tableName, partName); Map partSpec = Warehouse.makeSpecFromValues(tableObj.getPartitionKeys(), partObj.getValues()); if (shouldModifyPartitionLocation(dbObj, tableObj, partObj, partSpec)) { // Table directory (which includes the partition directory) has already been moved, // just update the partition location in the metastore. if (!runOptions.dryRun) { - Path newPartPath = wh.getPartitionPath(newTablePath, partSpec); - getHiveUpdater().updatePartitionLocation(dbName, tableObj, partName, partObj, newPartPath); + Path newPartPath = wh.get().getPartitionPath(newTablePath, partSpec); + hiveUpdater.get().updatePartitionLocation(dbName, tableObj, partName, partObj, newPartPath); } } } @@ -642,7 +731,7 @@ void moveTableData(Database dbObj, Table tableObj, Path newTablePath) throws Hiv // Finally update the table location. This would prevent this tool from processing this table again // on subsequent runs of the migration. if (!runOptions.dryRun) { - getHiveUpdater().updateTableLocation(tableObj, newTablePath); + hiveUpdater.get().updateTableLocation(tableObj, newTablePath); } } @@ -907,6 +996,7 @@ static boolean shouldTablePathBeExternal(Table tableObj, String ownerName, Confi } void cleanup() { + hms.close(); if (hiveUpdater != null) { runAndLogErrors(() -> hiveUpdater.close()); hiveUpdater = null; @@ -917,13 +1007,6 @@ public static HiveUpdater getHiveUpdater(HiveConf conf) throws HiveException { return new HiveUpdater(conf, false); } - HiveUpdater getHiveUpdater() throws HiveException { - if (hiveUpdater == null) { - hiveUpdater = new HiveUpdater(conf, true); - } - return hiveUpdater; - } - private static final class TxnCtx { public final long writeId; public final String validWriteIds; @@ -936,7 +1019,7 @@ public TxnCtx(long writeId, String validWriteIds, long txnId) { } } - public static class HiveUpdater { + public static class HiveUpdater implements AutoCloseable { Hive hive; boolean doFileRename; @@ -946,7 +1029,8 @@ public TxnCtx(long writeId, String validWriteIds, long txnId) { doFileRename = fileRename; } - void close() { + @Override + public void close() { if (hive != null) { runAndLogErrors(() -> Hive.closeCurrent()); hive = null; @@ -1122,8 +1206,6 @@ void updateTableProperties(Table table, Map props) throws HiveEx } } - HiveUpdater hiveUpdater; - interface ThrowableRunnable { void run() throws Exception; } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/util/CloseableThreadLocalTest.java b/ql/src/test/org/apache/hadoop/hive/ql/util/CloseableThreadLocalTest.java new file mode 100644 index 0000000000..f240ccacd8 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/util/CloseableThreadLocalTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.util; + +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import org.junit.Test; + +public class CloseableThreadLocalTest { + + private static class AutoCloseableStub implements AutoCloseable { + + private boolean closed = false; + + public boolean isClosed() { + return closed; + } + + @Override + public void close() throws Exception { + closed = true; + } + } + + @Test + public void testResourcesAreInitiallyNotClosed() { + CloseableThreadLocal closeableThreadLocal = + new CloseableThreadLocal<>(AutoCloseableStub::new, 1); + + assertThat(closeableThreadLocal.get().isClosed(), is(false)); + } + + @Test + public void testAfterCallingCloseAllInstancesAreClosed() throws ExecutionException, InterruptedException { + CloseableThreadLocal closeableThreadLocal = + new CloseableThreadLocal<>(AutoCloseableStub::new, 2); + + AutoCloseableStub asyncInstance = CompletableFuture.supplyAsync(closeableThreadLocal::get).get(); + AutoCloseableStub syncInstance = closeableThreadLocal.get(); + + closeableThreadLocal.close(); + + assertThat(asyncInstance.isClosed(), is(true)); + assertThat(syncInstance.isClosed(), is(true)); + } + + @Test + public void testSubsequentGetsInTheSameThreadGivesBackTheSameObject() { + CloseableThreadLocal closeableThreadLocal = + new CloseableThreadLocal<>(AutoCloseableStub::new, 2); + + AutoCloseableStub ref1 = closeableThreadLocal.get(); + AutoCloseableStub ref2 = closeableThreadLocal.get(); + assertThat(ref1, is(ref2)); + } + + @Test + public void testDifferentThreadsHasDifferentInstancesOfTheResource() throws ExecutionException, InterruptedException { + CloseableThreadLocal closeableThreadLocal = + new CloseableThreadLocal<>(AutoCloseableStub::new, 2); + + AutoCloseableStub asyncInstance = CompletableFuture.supplyAsync(closeableThreadLocal::get).get(); + AutoCloseableStub syncInstance = closeableThreadLocal.get(); + assertThat(asyncInstance, is(not(syncInstance))); + } +} \ No newline at end of file