diff --git upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/CloseableThreadLocal.java upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/CloseableThreadLocal.java new file mode 100644 index 0000000000..fbe0a80d48 --- /dev/null +++ upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/CloseableThreadLocal.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.upgrade.acid; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class has similar functionality as java.lang.ThreadLocal. + * Plus it provides a close function to clean up unmanaged resources in all threads where the resource was initialized. + * @param - type of resource + */ +public class CloseableThreadLocal { + + private static final Logger LOG = LoggerFactory.getLogger(CloseableThreadLocal.class); + + private final ConcurrentHashMap threadLocalMap; + private final Supplier initialValue; + private final Consumer closeFunction; + + public CloseableThreadLocal(Supplier initialValue, Consumer closeFunction, int poolSize) { + this.initialValue = initialValue; + threadLocalMap = new ConcurrentHashMap<>(poolSize); + this.closeFunction = closeFunction; + } + + public T get() { + return threadLocalMap.computeIfAbsent(Thread.currentThread(), thread -> initialValue.get()); + } + + public void close() { + threadLocalMap.values().forEach(this::closeQuietly); + } + + private void closeQuietly(T resource) { + try { + closeFunction.accept(resource); + } catch (Exception e) { + LOG.warn("Error while closing resource.", e); + } + } +} diff --git upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/CompactTablesState.java upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/CompactTablesState.java new file mode 100644 index 0000000000..beb934c83e --- /dev/null +++ upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/CompactTablesState.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.upgrade.acid; + +import static java.util.Collections.emptyList; + +import java.util.ArrayList; +import java.util.List; + +/** + * Store result of database and table scan: compaction commands and meta info. + */ +public final class CompactTablesState { + + public static CompactTablesState empty() { + return new CompactTablesState(emptyList(), new CompactionMetaInfo()); + } + + public static CompactTablesState compactions(List compactionCommands, CompactionMetaInfo compactionMetaInfo) { + return new CompactTablesState(compactionCommands, compactionMetaInfo); + } + + private final List compactionCommands; + private final CompactionMetaInfo compactionMetaInfo; + + private CompactTablesState(List compactionCommands, CompactionMetaInfo compactionMetaInfo) { + this.compactionCommands = compactionCommands; + this.compactionMetaInfo = compactionMetaInfo; + } + + public List getCompactionCommands() { + return compactionCommands; + } + + public CompactionMetaInfo getMetaInfo() { + return compactionMetaInfo; + } + + public CompactTablesState merge(CompactTablesState other) { + List compactionCommands = new ArrayList<>(this.compactionCommands); + compactionCommands.addAll(other.compactionCommands); + return new CompactTablesState(compactionCommands, this.compactionMetaInfo.merge(other.compactionMetaInfo)); + } +} diff --git upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/CompactionMetaInfo.java upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/CompactionMetaInfo.java new file mode 100644 index 0000000000..72b4ec63a9 --- /dev/null +++ upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/CompactionMetaInfo.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.upgrade.acid; + +import java.util.HashSet; +import java.util.Set; + +/** + * Store result of compaction calls. + */ +public class CompactionMetaInfo { + /** + * total number of bytes to be compacted across all compaction commands. + */ + private long numberOfBytes; + /** + * IDs of compactions launched by this utility. + */ + private final Set compactionIds; + + public CompactionMetaInfo() { + compactionIds = new HashSet<>(); + numberOfBytes = 0; + } + + private CompactionMetaInfo(Set initialCompactionIds, long initialNumberOfBytes) { + this.compactionIds = new HashSet<>(initialCompactionIds); + numberOfBytes = initialNumberOfBytes; + } + + public CompactionMetaInfo merge(CompactionMetaInfo other) { + CompactionMetaInfo result = new CompactionMetaInfo(this.compactionIds, this.numberOfBytes); + result.numberOfBytes += other.numberOfBytes; + result.compactionIds.addAll(other.compactionIds); + return result; + } + + public long getNumberOfBytes() { + return numberOfBytes; + } + + public void addBytes(long bytes) { + numberOfBytes += bytes; + } + + public Set getCompactionIds() { + return compactionIds; + } + + public void addCompactionId(long compactionId) { + compactionIds.add(compactionId); + } +} diff --git upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/NamedForkJoinWorkerThreadFactory.java upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/NamedForkJoinWorkerThreadFactory.java new file mode 100644 index 0000000000..2b95f7be96 --- /dev/null +++ upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/NamedForkJoinWorkerThreadFactory.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.upgrade.acid; + +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinWorkerThread; + +/** + * This class allows specifying a prefix for ForkJoinPool thread names. + */ +public class NamedForkJoinWorkerThreadFactory implements ForkJoinPool.ForkJoinWorkerThreadFactory { + + NamedForkJoinWorkerThreadFactory(String namePrefix) { + this.namePrefix = namePrefix; + } + + private final String namePrefix; + + @Override + public ForkJoinWorkerThread newThread(ForkJoinPool pool) { + ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool); + worker.setName(namePrefix + worker.getName()); + return worker; + } +} diff --git upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/PreUpgradeTool.java upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/PreUpgradeTool.java index 0a7354d12b..848f28158c 100644 --- upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/PreUpgradeTool.java +++ upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/PreUpgradeTool.java @@ -29,9 +29,8 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashSet; import java.util.List; -import java.util.Set; +import java.util.concurrent.ForkJoinPool; import java.util.stream.Collectors; import org.apache.commons.cli.CommandLine; @@ -57,6 +56,7 @@ import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.CompactionResponse; +import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; @@ -68,8 +68,8 @@ import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.orc.OrcFile; import org.apache.hadoop.hive.ql.io.orc.Reader; -import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.security.AccessControlException; @@ -116,26 +116,24 @@ * * See also org.apache.hadoop.hive.ql.util.UpgradeTool in Hive 3.x */ -public class PreUpgradeTool { +public class PreUpgradeTool implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(PreUpgradeTool.class); private static final int PARTITION_BATCH_SIZE = 10000; - private final Options cmdLineOptions = new Options(); public static void main(String[] args) throws Exception { - PreUpgradeTool tool = new PreUpgradeTool(); - tool.init(); + Options cmdLineOptions = createCommandLineOptions(); CommandLineParser parser = new GnuParser(); CommandLine line ; try { - line = parser.parse(tool.cmdLineOptions, args); + line = parser.parse(cmdLineOptions, args); } catch (ParseException e) { System.err.println("PreUpgradeTool: Parsing failed. Reason: " + e.getLocalizedMessage()); - printAndExit(tool); + printAndExit(cmdLineOptions); return; } if (line.hasOption("help")) { HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp("upgrade-acid", tool.cmdLineOptions); + formatter.printHelp("upgrade-acid", cmdLineOptions); return; } RunOptions runOptions = RunOptions.fromCommandLine(line); @@ -144,25 +142,53 @@ public static void main(String[] args) throws Exception { try { String hiveVer = HiveVersionInfo.getShortVersion(); LOG.info("Using Hive Version: " + HiveVersionInfo.getVersion() + " build: " + - HiveVersionInfo.getBuildVersion()); + HiveVersionInfo.getBuildVersion()); if(!hiveVer.startsWith("2.")) { throw new IllegalStateException("preUpgrade requires Hive 2.x. Actual: " + hiveVer); } - tool.prepareAcidUpgradeInternal(runOptions); + try (PreUpgradeTool tool = new PreUpgradeTool(runOptions)) { + tool.prepareAcidUpgradeInternal(); + } } catch(Exception ex) { LOG.error("PreUpgradeTool failed", ex); throw ex; } } - private static void printAndExit(PreUpgradeTool tool) { + + private final HiveConf conf; + private final CloseableThreadLocal metaStoreClient; + private final ThreadLocal txns; + private final RunOptions runOptions; + + public PreUpgradeTool(RunOptions runOptions) { + this.runOptions = runOptions; + this.conf = hiveConf != null ? hiveConf : new HiveConf(); + this.metaStoreClient = new CloseableThreadLocal<>(this::getHMS, IMetaStoreClient::close, + runOptions.getDatabasePoolSize() + runOptions.getTablePoolSize()); + this.txns = ThreadLocal.withInitial(() -> { + /* + This API changed from 2.x to 3.0. so this won't even compile with 3.0 + but it doesn't need to since we only run this preUpgrade + */ + try { + TxnStore txnHandler = TxnUtils.getTxnStore(conf); + return TxnUtils.createValidCompactTxnList(txnHandler.getOpenTxnsInfo()); + } catch (MetaException e) { + throw new RuntimeException(e); + } + }); + } + + private static void printAndExit(Options cmdLineOptions) { HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp("upgrade-acid", tool.cmdLineOptions); + formatter.printHelp("upgrade-acid", cmdLineOptions); System.exit(1); } - private void init() { + static Options createCommandLineOptions() { try { + Options cmdLineOptions = new Options(); cmdLineOptions.addOption(new Option("help", "Generates a script to execute on 2.x" + " cluster. This requires 2.x binaries on the classpath and hive-site.xml.")); Option exec = new Option("execute", @@ -196,23 +222,38 @@ private void init() { tableTypeOption.setArgs(1); tableTypeOption.setArgName("table type"); cmdLineOptions.addOption(tableTypeOption); + + Option databasePoolSizeOption = new Option("dn", "Number of threads to process databases."); + databasePoolSizeOption.setLongOpt("databasePoolSize"); + databasePoolSizeOption.setArgs(1); + databasePoolSizeOption.setArgName("pool size"); + cmdLineOptions.addOption(databasePoolSizeOption); + + Option tablePoolSizeOption = new Option("tn", "Number of threads to process tables."); + tablePoolSizeOption.setLongOpt("tablePoolSize"); + tablePoolSizeOption.setArgs(1); + tablePoolSizeOption.setArgName("pool size"); + cmdLineOptions.addOption(tablePoolSizeOption); + + return cmdLineOptions; } catch(Exception ex) { LOG.error("init()", ex); throw ex; } } + private static HiveMetaHookLoader getHookLoader() { return new HiveMetaHookLoader() { @Override public HiveMetaHook getHook( - org.apache.hadoop.hive.metastore.api.Table tbl) { + org.apache.hadoop.hive.metastore.api.Table tbl) { return null; } }; } - private static IMetaStoreClient getHMS(HiveConf conf) { + public IMetaStoreClient getHMS() { UserGroupInformation loggedInUser = null; try { loggedInUser = UserGroupInformation.getLoginUser(); @@ -229,90 +270,57 @@ private static IMetaStoreClient getHMS(HiveConf conf) { which calls HiveMetaStoreClient(HiveConf, Boolean) which exists in (at least) 2.1.0.2.6.5.0-292 and later but not in 2.1.0.2.6.0.3-8 (the HDP 2.6 release) i.e. RetryingMetaStoreClient.getProxy(conf, true) is broken in 2.6.0*/ - return RetryingMetaStoreClient.getProxy(conf, - new Class[]{HiveConf.class, HiveMetaHookLoader.class, Boolean.class}, - new Object[]{conf, getHookLoader(), Boolean.TRUE}, null, HiveMetaStoreClient.class.getName()); - } catch (MetaException e) { + IMetaStoreClient client = RetryingMetaStoreClient.getProxy(conf, + new Class[]{HiveConf.class, HiveMetaHookLoader.class, Boolean.class}, + new Object[]{conf, getHookLoader(), Boolean.TRUE}, null, HiveMetaStoreClient.class.getName()); + if (hiveConf != null) { + SessionState ss = SessionState.start(conf); + ss.applyAuthorizationPolicy(); + } + return client; + } catch (MetaException | HiveException e) { throw new RuntimeException("Error connecting to Hive Metastore URI: " - + conf.getVar(HiveConf.ConfVars.METASTOREURIS) + ". " + e.getMessage(), e); + + conf.getVar(HiveConf.ConfVars.METASTOREURIS) + ". " + e.getMessage(), e); } } /** * todo: change script comments to a preamble instead of a footer */ - private void prepareAcidUpgradeInternal(RunOptions runOptions) + private void prepareAcidUpgradeInternal() throws HiveException, TException, IOException { - HiveConf conf = hiveConf != null ? hiveConf : new HiveConf(); - boolean isAcidEnabled = isAcidEnabled(conf); - IMetaStoreClient hms = getHMS(conf); + if (!isAcidEnabled(conf)) { + LOG.info("acid is off, there can't be any acid tables - nothing to compact"); + return; + } + IMetaStoreClient hms = metaStoreClient.get(); LOG.debug("Looking for databases"); String exceptionMsg = null; List databases; - List compactions = new ArrayList<>(); - final CompactionMetaInfo compactionMetaInfo = new CompactionMetaInfo(); - ValidTxnList txns = null; - Hive db = null; + CompactTablesState compactTablesState; try { databases = hms.getDatabases(runOptions.getDbRegex()); //TException LOG.debug("Found " + databases.size() + " databases to process"); - if (runOptions.isExecute()) { - db = Hive.get(conf); - } - for (String dbName : databases) { - try { - List tables; - if (runOptions.getTableType() == null) { - tables = hms.getTables(dbName, runOptions.getTableRegex()); - LOG.debug("found {} tables in {}", tables.size(), dbName); - } else { - tables = hms.getTables(dbName, runOptions.getTableRegex(), runOptions.getTableType()); - LOG.debug("found {} {} in {}", tables.size(), runOptions.getTableType().name(), dbName); - } - for (String tableName : tables) { - try { - Table t = hms.getTable(dbName, tableName); - LOG.debug("processing table " + Warehouse.getQualifiedName(t)); - if (isAcidEnabled) { - //if acid is off, there can't be any acid tables - nothing to compact - if (txns == null) { - /* - This API changed from 2.x to 3.0. so this won't even compile with 3.0 - but it doesn't need to since we only run this preUpgrade - */ - TxnStore txnHandler = TxnUtils.getTxnStore(conf); - txns = TxnUtils.createValidCompactTxnList(txnHandler.getOpenTxnsInfo()); - } - List compactionCommands = - getCompactionCommands(t, conf, hms, compactionMetaInfo, runOptions.isExecute(), db, txns); - compactions.addAll(compactionCommands); - } - /*todo: handle renaming files somewhere*/ - } catch (Exception e) { - if (isAccessControlException(e)) { - // this could be external table with 0 permission for hive user - exceptionMsg = "Unable to access " + dbName + "." + tableName + ". Pre-upgrade tool requires read-access " + - "to databases and tables to determine if a table has to be compacted. " + - "Set " + HiveConf.ConfVars.HIVE_METASTORE_AUTHORIZATION_AUTH_READS.varname + " config to " + - "false to allow read-access to databases and tables and retry the pre-upgrade tool again.."; - } - throw e; - } - } - } catch (Exception e) { - if (exceptionMsg == null && isAccessControlException(e)) { - // we may not have access to read all tables from this db - exceptionMsg = "Unable to access " + dbName + ". Pre-upgrade tool requires read-access " + - "to databases and tables to determine if a table has to be compacted. " + - "Set " + HiveConf.ConfVars.HIVE_METASTORE_AUTHORIZATION_AUTH_READS.varname + " config to " + - "false to allow read-access to databases and tables and retry the pre-upgrade tool again.."; - } - throw e; - } - } + ForkJoinPool processDatabasePool = new ForkJoinPool( + runOptions.getDatabasePoolSize(), + new NamedForkJoinWorkerThreadFactory("Database-"), + getUncaughtExceptionHandler(), + false); + ForkJoinPool processTablePool = new ForkJoinPool( + runOptions.getTablePoolSize(), + new NamedForkJoinWorkerThreadFactory("Table-"), + getUncaughtExceptionHandler(), + false + ); + compactTablesState = processDatabasePool.submit( + () -> databases.parallelStream() + .map(dbName -> processDatabase(dbName, processTablePool, runOptions)) + .reduce(CompactTablesState::merge)).get() + .orElse(CompactTablesState.empty()); + } catch (Exception e) { - if (exceptionMsg == null && isAccessControlException(e)) { + if (isAccessControlException(e)) { exceptionMsg = "Unable to get databases. Pre-upgrade tool requires read-access " + "to databases and tables to determine if a table has to be compacted. " + "Set " + HiveConf.ConfVars.HIVE_METASTORE_AUTHORIZATION_AUTH_READS.varname + " config to " + @@ -321,27 +329,27 @@ private void prepareAcidUpgradeInternal(RunOptions runOptions) throw new HiveException(exceptionMsg, e); } - makeCompactionScript(compactions, runOptions.getOutputDir(), compactionMetaInfo); + makeCompactionScript(compactTablesState, runOptions.getOutputDir()); if(runOptions.isExecute()) { - while(compactionMetaInfo.compactionIds.size() > 0) { - LOG.debug("Will wait for " + compactionMetaInfo.compactionIds.size() + + while(compactTablesState.getMetaInfo().getCompactionIds().size() > 0) { + LOG.debug("Will wait for " + compactTablesState.getMetaInfo().getCompactionIds().size() + " compactions to complete"); - ShowCompactResponse resp = db.showCompactions(); + ShowCompactResponse resp = hms.showCompactions(); for(ShowCompactResponseElement e : resp.getCompacts()) { final String state = e.getState(); boolean removed; switch (state) { case TxnStore.CLEANING_RESPONSE: case TxnStore.SUCCEEDED_RESPONSE: - removed = compactionMetaInfo.compactionIds.remove(e.getId()); + removed = compactTablesState.getMetaInfo().getCompactionIds().remove(e.getId()); if(removed) { LOG.debug("Required compaction succeeded: " + e.toString()); } break; case TxnStore.ATTEMPTED_RESPONSE: case TxnStore.FAILED_RESPONSE: - removed = compactionMetaInfo.compactionIds.remove(e.getId()); + removed = compactTablesState.getMetaInfo().getCompactionIds().remove(e.getId()); if(removed) { LOG.warn("Required compaction failed: " + e.toString()); } @@ -357,7 +365,7 @@ private void prepareAcidUpgradeInternal(RunOptions runOptions) LOG.error("Unexpected state for : " + e.toString()); } } - if(compactionMetaInfo.compactionIds.size() > 0) { + if(compactTablesState.getMetaInfo().getCompactionIds().size() > 0) { try { if (callback != null) { callback.onWaitForCompaction(); @@ -371,6 +379,67 @@ private void prepareAcidUpgradeInternal(RunOptions runOptions) } } + private Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() { + return (t, e) -> LOG.error(String.format("Thread %s exited with error", t.getName()), e); + } + + private CompactTablesState processDatabase( + String dbName, ForkJoinPool threadPool, RunOptions runOptions) { + try { + IMetaStoreClient hms = metaStoreClient.get(); + + List tables; + if (runOptions.getTableType() == null) { + tables = hms.getTables(dbName, runOptions.getTableRegex()); + LOG.debug("found {} tables in {}", tables.size(), dbName); + } + else { + tables = hms.getTables(dbName, runOptions.getTableRegex(), runOptions.getTableType()); + LOG.debug("found {} {} in {}", tables.size(), runOptions.getTableType().name(), dbName); + } + + return threadPool.submit( + () -> tables.parallelStream() + .map(table -> processTable(dbName, table, runOptions)) + .reduce(CompactTablesState::merge)).get() + .orElse(CompactTablesState.empty()); + } catch (Exception e) { + if (isAccessControlException(e)) { + // we may not have access to read all tables from this db + throw new RuntimeException("Unable to access " + dbName + ". Pre-upgrade tool requires read-access " + + "to databases and tables to determine if a table has to be compacted. " + + "Set " + HiveConf.ConfVars.HIVE_METASTORE_AUTHORIZATION_AUTH_READS.varname + " config to " + + "false to allow read-access to databases and tables and retry the pre-upgrade tool again..", e); + } + throw new RuntimeException(e); + } + } + + private CompactTablesState processTable( + String dbName, String tableName, RunOptions runOptions) { + try { + IMetaStoreClient hms = metaStoreClient.get(); + final CompactionMetaInfo compactionMetaInfo = new CompactionMetaInfo(); + + Table t = hms.getTable(dbName, tableName); + LOG.debug("processing table " + Warehouse.getQualifiedName(t)); + List compactionCommands = + getCompactionCommands(t, conf, hms, compactionMetaInfo, runOptions.isExecute(), txns.get()); + return CompactTablesState.compactions(compactionCommands, compactionMetaInfo); + /*todo: handle renaming files somewhere*/ + } catch (Exception e) { + if (isAccessControlException(e)) { + // this could be external table with 0 permission for hive user + throw new RuntimeException( + "Unable to access " + dbName + "." + tableName + ". Pre-upgrade tool requires read-access " + + "to databases and tables to determine if a table has to be compacted. " + + "Set " + HiveConf.ConfVars.HIVE_METASTORE_AUTHORIZATION_AUTH_READS.varname + " config to " + + "false to allow read-access to databases and tables and retry the pre-upgrade tool again..", e); + } + throw new RuntimeException(e); + } + } + private boolean isAccessControlException(final Exception e) { // hadoop security AccessControlException if ((e instanceof MetaException && e.getCause() instanceof AccessControlException) || @@ -391,25 +460,25 @@ private boolean isAccessControlException(final Exception e) { /** * Generates a set compaction commands to run on pre Hive 3 cluster */ - private static void makeCompactionScript(List commands, String scriptLocation, - CompactionMetaInfo compactionMetaInfo) throws IOException { - if (commands.isEmpty()) { + private static void makeCompactionScript(CompactTablesState result, String scriptLocation) throws IOException { + if (result.getCompactionCommands().isEmpty()) { LOG.info("No compaction is necessary"); return; } String fileName = "compacts_" + System.currentTimeMillis() + ".sql"; LOG.debug("Writing compaction commands to " + fileName); - try(PrintWriter pw = createScript(commands, fileName, scriptLocation)) { + try(PrintWriter pw = createScript( + result.getCompactionCommands(), fileName, scriptLocation)) { //add post script - pw.println("-- Generated total of " + commands.size() + " compaction commands"); - if(compactionMetaInfo.numberOfBytes < Math.pow(2, 20)) { + pw.println("-- Generated total of " + result.getCompactionCommands().size() + " compaction commands"); + if(result.getMetaInfo().getNumberOfBytes() < Math.pow(2, 20)) { //to see it working in UTs pw.println("-- The total volume of data to be compacted is " + - String.format("%.6fMB", compactionMetaInfo.numberOfBytes/Math.pow(2, 20))); + String.format("%.6fMB", result.getMetaInfo().getNumberOfBytes()/Math.pow(2, 20))); } else { pw.println("-- The total volume of data to be compacted is " + - String.format("%.3fGB", compactionMetaInfo.numberOfBytes/Math.pow(2, 30))); + String.format("%.3fGB", result.getMetaInfo().getNumberOfBytes()/Math.pow(2, 30))); } pw.println(); //todo: should be at the top of the file... @@ -438,7 +507,7 @@ private static PrintWriter createScript(List commands, String fileName, * @return any compaction commands to run for {@code Table t} */ private static List getCompactionCommands(Table t, HiveConf conf, - IMetaStoreClient hms, CompactionMetaInfo compactionMetaInfo, boolean execute, Hive db, + IMetaStoreClient hms, CompactionMetaInfo compactionMetaInfo, boolean execute, ValidTxnList txns) throws IOException, TException, HiveException { if(!isFullAcidTable(t)) { return Collections.emptyList(); @@ -452,7 +521,7 @@ private static PrintWriter createScript(List commands, String fileName, List cmds = new ArrayList<>(); cmds.add(getCompactionCommand(t, null)); if(execute) { - scheduleCompaction(t, null, db, compactionMetaInfo); + scheduleCompaction(t, null, hms, compactionMetaInfo); } return cmds; } @@ -463,19 +532,19 @@ private static PrintWriter createScript(List commands, String fileName, for(int i = 0; i < numWholeBatches; i++) { List partitionList = hms.getPartitionsByNames(t.getDbName(), t.getTableName(), partNames.subList(i * batchSize, (i + 1) * batchSize)); - getCompactionCommands(t, partitionList, db, execute, compactionCommands, + getCompactionCommands(t, partitionList, hms, execute, compactionCommands, compactionMetaInfo, conf, txns); } if(numWholeBatches * batchSize < partNames.size()) { //last partial batch List partitionList = hms.getPartitionsByNames(t.getDbName(), t.getTableName(), partNames.subList(numWholeBatches * batchSize, partNames.size())); - getCompactionCommands(t, partitionList, db, execute, compactionCommands, + getCompactionCommands(t, partitionList, hms, execute, compactionCommands, compactionMetaInfo, conf, txns); } return compactionCommands; } - private static void getCompactionCommands(Table t, List partitionList, Hive db, + private static void getCompactionCommands(Table t, List partitionList, IMetaStoreClient hms, boolean execute, List compactionCommands, CompactionMetaInfo compactionMetaInfo, HiveConf conf, ValidTxnList txns) throws IOException, TException, HiveException { @@ -483,28 +552,31 @@ private static void getCompactionCommands(Table t, List partitionList if (needsCompaction(new Path(p.getSd().getLocation()), conf, compactionMetaInfo, txns)) { compactionCommands.add(getCompactionCommand(t, p)); if (execute) { - scheduleCompaction(t, p, db, compactionMetaInfo); + scheduleCompaction(t, p, hms, compactionMetaInfo); } } } } - private static void scheduleCompaction(Table t, Partition p, Hive db, + private static void scheduleCompaction(Table t, Partition p, IMetaStoreClient db, CompactionMetaInfo compactionMetaInfo) throws HiveException, MetaException { String partName = p == null ? null : Warehouse.makePartName(t.getPartitionKeys(), p.getValues()); - CompactionResponse resp = - //this gives an easy way to get at compaction ID so we can only wait for those this - //utility started - db.compact2(t.getDbName(), t.getTableName(), partName, "major", null); - if(!resp.isAccepted()) { - LOG.info(Warehouse.getQualifiedName(t) + (p == null ? "" : "/" + partName) + - " is already being compacted with id=" + resp.getId()); - } - else { - LOG.info("Scheduled compaction for " + Warehouse.getQualifiedName(t) + - (p == null ? "" : "/" + partName) + " with id=" + resp.getId()); - } - compactionMetaInfo.compactionIds.add(resp.getId()); + try { + CompactionResponse resp = + //this gives an easy way to get at compaction ID so we can only wait for those this + //utility started + db.compact2(t.getDbName(), t.getTableName(), partName, CompactionType.MAJOR, null); + if (!resp.isAccepted()) { + LOG.info(Warehouse.getQualifiedName(t) + (p == null ? "" : "/" + partName) + + " is already being compacted with id=" + resp.getId()); + } else { + LOG.info("Scheduled compaction for " + Warehouse.getQualifiedName(t) + + (p == null ? "" : "/" + partName) + " with id=" + resp.getId()); + } + compactionMetaInfo.addCompactionId(resp.getId()); + } catch (TException e) { + throw new HiveException(e); + } } /** @@ -547,14 +619,14 @@ public boolean accept(Path path) { } if(needsCompaction(bucket, fs)) { //found delete events - this 'location' needs compacting - compactionMetaInfo.numberOfBytes += getDataSize(location, conf); + compactionMetaInfo.addBytes(getDataSize(location, conf)); //if there are un-compacted original files, they will be included in compaction, so //count at the size for 'cost' estimation later for(HadoopShims.HdfsFileStatusWithId origFile : dir.getOriginalFiles()) { FileStatus fileStatus = origFile.getFileStatus(); if(fileStatus != null) { - compactionMetaInfo.numberOfBytes += fileStatus.getLen(); + compactionMetaInfo.addBytes(fileStatus.getLen()); } } return true; @@ -664,15 +736,9 @@ private static boolean isAcidEnabled(HiveConf hiveConf) { return txnMgr.equals(dbTxnMgr) && concurrency; } - private static class CompactionMetaInfo { - /** - * total number of bytes to be compacted across all compaction commands - */ - long numberOfBytes; - /** - * IDs of compactions launched by this utility - */ - Set compactionIds = new HashSet<>(); + @Override + public void close() { + metaStoreClient.close(); } @VisibleForTesting diff --git upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/RunOptions.java upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/RunOptions.java index 66213d424c..748023bfba 100644 --- upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/RunOptions.java +++ upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/RunOptions.java @@ -27,13 +27,38 @@ public static RunOptions fromCommandLine(CommandLine commandLine) { String tableTypeText = commandLine.getOptionValue("tableType"); + + int defaultPoolSize = Runtime.getRuntime().availableProcessors() / 2; + if (defaultPoolSize < 1) + defaultPoolSize = 1; + + int databasePoolSize = getIntOptionValue(commandLine, "databasePoolSize", defaultPoolSize); + if (databasePoolSize < 1) + throw new IllegalArgumentException("Please specify a positive integer option value for databasePoolSize"); + + int tablePoolSize = getIntOptionValue(commandLine, "tablePoolSize", defaultPoolSize); + if (tablePoolSize < 1) + throw new IllegalArgumentException("Please specify a positive integer option value for tablePoolSize"); + return new RunOptions( commandLine.getOptionValue("location", "."), commandLine.hasOption("execute"), commandLine.getOptionValue("dbRegex", ".*"), commandLine.getOptionValue("tableRegex", ".*"), - tableTypeText == null ? null : TableType.valueOf(tableTypeText) - ); + tableTypeText == null ? null : TableType.valueOf(tableTypeText), + databasePoolSize, + tablePoolSize); + } + + private static int getIntOptionValue(CommandLine commandLine, String optionName, int defaultValue) { + if (commandLine.hasOption(optionName)) { + try { + return Integer.parseInt(commandLine.getOptionValue(optionName)); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Please specify a positive integer option value for " + optionName, e); + } + } + return defaultValue; } private final String outputDir; @@ -41,13 +66,17 @@ public static RunOptions fromCommandLine(CommandLine commandLine) { private final String dbRegex; private final String tableRegex; private final TableType tableType; + private final int databasePoolSize; + private final int tablePoolSize; - public RunOptions(String outputDir, boolean execute, String dbRegex, String tableRegex, TableType tableType) { + private RunOptions(String outputDir, boolean execute, String dbRegex, String tableRegex, TableType tableType, int databasePoolSize, int tablePoolSize) { this.outputDir = outputDir; this.execute = execute; this.dbRegex = dbRegex; this.tableRegex = tableRegex; this.tableType = tableType; + this.databasePoolSize = databasePoolSize; + this.tablePoolSize = tablePoolSize; } public String getOutputDir() { @@ -70,6 +99,14 @@ public TableType getTableType() { return tableType; } + public int getDatabasePoolSize() { + return databasePoolSize; + } + + public int getTablePoolSize() { + return tablePoolSize; + } + @Override public String toString() { return "RunOptions{" + @@ -78,6 +115,8 @@ public String toString() { ", dbRegex='" + dbRegex + '\'' + ", tableRegex='" + tableRegex + '\'' + ", tableType=" + tableType + + ", databasePoolSize=" + databasePoolSize + + ", tablePoolSize=" + tablePoolSize + '}'; } } diff --git upgrade-acid/pre-upgrade/src/test/java/org/apache/hadoop/hive/upgrade/acid/CloseableThreadLocalTest.java upgrade-acid/pre-upgrade/src/test/java/org/apache/hadoop/hive/upgrade/acid/CloseableThreadLocalTest.java new file mode 100644 index 0000000000..f6b2f0df4d --- /dev/null +++ upgrade-acid/pre-upgrade/src/test/java/org/apache/hadoop/hive/upgrade/acid/CloseableThreadLocalTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.upgrade.acid; + +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import org.junit.Test; + +public class CloseableThreadLocalTest { + + private static class AutoCloseableStub implements AutoCloseable { + + private boolean closed = false; + + public boolean isClosed() { + return closed; + } + + @Override + public void close() { + closed = true; + } + } + + @Test + public void testResourcesAreInitiallyNotClosed() { + CloseableThreadLocal closeableThreadLocal = + new CloseableThreadLocal<>(AutoCloseableStub::new, AutoCloseableStub::close, 1); + + assertThat(closeableThreadLocal.get().isClosed(), is(false)); + } + + @Test + public void testAfterCallingCloseAllInstancesAreClosed() throws ExecutionException, InterruptedException { + CloseableThreadLocal closeableThreadLocal = + new CloseableThreadLocal<>(AutoCloseableStub::new, AutoCloseableStub::close, 2); + + AutoCloseableStub asyncInstance = CompletableFuture.supplyAsync(closeableThreadLocal::get).get(); + AutoCloseableStub syncInstance = closeableThreadLocal.get(); + + closeableThreadLocal.close(); + + assertThat(asyncInstance.isClosed(), is(true)); + assertThat(syncInstance.isClosed(), is(true)); + } + + @Test + public void testSubsequentGetsInTheSameThreadGivesBackTheSameObject() { + CloseableThreadLocal closeableThreadLocal = + new CloseableThreadLocal<>(AutoCloseableStub::new, AutoCloseableStub::close, 2); + + AutoCloseableStub ref1 = closeableThreadLocal.get(); + AutoCloseableStub ref2 = closeableThreadLocal.get(); + assertThat(ref1, is(ref2)); + } + + @Test + public void testDifferentThreadsHasDifferentInstancesOfTheResource() throws ExecutionException, InterruptedException { + CloseableThreadLocal closeableThreadLocal = + new CloseableThreadLocal<>(AutoCloseableStub::new, AutoCloseableStub::close, 2); + + AutoCloseableStub asyncInstance = CompletableFuture.supplyAsync(closeableThreadLocal::get).get(); + AutoCloseableStub syncInstance = closeableThreadLocal.get(); + assertThat(asyncInstance, is(not(syncInstance))); + } +} \ No newline at end of file diff --git upgrade-acid/pre-upgrade/src/test/java/org/apache/hadoop/hive/upgrade/acid/RunOptionsTest.java upgrade-acid/pre-upgrade/src/test/java/org/apache/hadoop/hive/upgrade/acid/RunOptionsTest.java new file mode 100644 index 0000000000..d1d83d196a --- /dev/null +++ upgrade-acid/pre-upgrade/src/test/java/org/apache/hadoop/hive/upgrade/acid/RunOptionsTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.upgrade.acid; + +import static org.apache.hadoop.hive.upgrade.acid.PreUpgradeTool.createCommandLineOptions; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; + +import org.apache.commons.cli.GnuParser; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +public class RunOptionsTest { + + @Rule + public ExpectedException expectedEx = ExpectedException.none(); + + @Test + public void testDatabasePoolSizeIs10WhenSpecified() throws Exception { + String[] args = {"-databasePoolSize", "10"}; + RunOptions runOptions = RunOptions.fromCommandLine(new GnuParser().parse(createCommandLineOptions(), args)); + assertThat(runOptions.getDatabasePoolSize(), is(10)); + } + + @Test + public void testExceptionIsThrownWhenDatabasePoolSizeIsNotANumber() throws Exception { + expectedEx.expect(IllegalArgumentException.class); + expectedEx.expectMessage("Please specify a positive integer option value for databasePoolSize"); + + String[] args = {"-databasePoolSize", "notANumber"}; + RunOptions.fromCommandLine(new GnuParser().parse(createCommandLineOptions(), args)); + } + + @Test + public void testExceptionIsThrownWhenDatabasePoolSizeIsLessThan1() throws Exception { + expectedEx.expect(IllegalArgumentException.class); + expectedEx.expectMessage("Please specify a positive integer option value for databasePoolSize"); + + String[] args = {"-databasePoolSize", "0"}; + RunOptions.fromCommandLine(new GnuParser().parse(createCommandLineOptions(), args)); + } + + @Test + public void testExceptionIsThrownWhenDatabasePoolSizeIsNotInteger() throws Exception { + expectedEx.expect(IllegalArgumentException.class); + expectedEx.expectMessage("Please specify a positive integer option value for databasePoolSize"); + + String[] args = {"-databasePoolSize", "0.5"}; + RunOptions.fromCommandLine(new GnuParser().parse(createCommandLineOptions(), args)); + } + + @Test + public void testTablePoolSizeIs5WhenSpecified() throws Exception { + String[] args = {"-tablePoolSize", "5"}; + RunOptions runOptions = RunOptions.fromCommandLine(new GnuParser().parse(createCommandLineOptions(), args)); + assertThat(runOptions.getTablePoolSize(), is(5)); + } + + @Test + public void testExceptionIsThrownWhenTablePoolSizeIsNotANumber() throws Exception { + expectedEx.expect(IllegalArgumentException.class); + expectedEx.expectMessage("Please specify a positive integer option value for tablePoolSize"); + + String[] args = {"-tablePoolSize", "notANumber"}; + RunOptions.fromCommandLine(new GnuParser().parse(createCommandLineOptions(), args)); + } + + @Test + public void testExceptionIsThrownWhenTablePoolSizeIsLessThan1() throws Exception { + expectedEx.expect(IllegalArgumentException.class); + expectedEx.expectMessage("Please specify a positive integer option value for tablePoolSize"); + + String[] args = {"-tablePoolSize", "0"}; + RunOptions.fromCommandLine(new GnuParser().parse(createCommandLineOptions(), args)); + } + + @Test + public void testExceptionIsThrownWhenTablePoolSizeIsNotInteger() throws Exception { + expectedEx.expect(IllegalArgumentException.class); + expectedEx.expectMessage("Please specify a positive integer option value for tablePoolSize"); + + String[] args = {"-tablePoolSize", "0.5"}; + RunOptions.fromCommandLine(new GnuParser().parse(createCommandLineOptions(), args)); + } +}