diff --git a/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/CloseableThreadLocal.java b/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/CloseableThreadLocal.java new file mode 100644 index 0000000000..ae36eb3445 --- /dev/null +++ b/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/CloseableThreadLocal.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hive.upgrade.acid; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CloseableThreadLocal { + + private static final Logger LOG = LoggerFactory.getLogger(CloseableThreadLocal.class); + + private final ConcurrentHashMap threadLocalMap; + private final Supplier initialValue; + private final Consumer closeFunction; + + public CloseableThreadLocal(Supplier initialValue, Consumer closeFunction, int poolSize) { + this.initialValue = initialValue; + threadLocalMap = new ConcurrentHashMap<>(poolSize); + this.closeFunction = closeFunction; + } + + public T get() { + return threadLocalMap.computeIfAbsent(Thread.currentThread(), thread -> initialValue.get()); + } + + public void close() { + threadLocalMap.values().forEach(this::closeQuietly); + } + + private void closeQuietly(T resource) { + try { + closeFunction.accept(resource); + } catch (Exception e) { + LOG.warn("Error while closing resource.", e); + } + } +} diff --git a/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/CompactTablesState.java b/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/CompactTablesState.java new file mode 100644 index 0000000000..8ee8b94ecf --- /dev/null +++ b/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/CompactTablesState.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hive.upgrade.acid; + +import static java.util.Collections.emptyList; + +import java.util.ArrayList; +import java.util.List; + +public class CompactTablesState { + + public static CompactTablesState empty() { + return new CompactTablesState(emptyList(), new CompactionMetaInfo()); + } + + public static CompactTablesState compactions(List compactionCommands, CompactionMetaInfo compactionMetaInfo) { + return new CompactTablesState(compactionCommands, compactionMetaInfo); + } + + private final List compactionCommands; + private final CompactionMetaInfo compactionMetaInfo; + + private CompactTablesState(List compactionCommands, CompactionMetaInfo compactionMetaInfo) { + this.compactionCommands = compactionCommands; + this.compactionMetaInfo = compactionMetaInfo; + } + + public List getCompactionCommands() { + return compactionCommands; + } + + public CompactionMetaInfo getMetaInfo() { + return compactionMetaInfo; + } + + public CompactTablesState merge(CompactTablesState other) { + List compactionCommands = new ArrayList<>(this.compactionCommands); + compactionCommands.addAll(other.compactionCommands); + return new CompactTablesState(compactionCommands, this.compactionMetaInfo.merge(other.compactionMetaInfo)); + } +} diff --git a/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/CompactionMetaInfo.java b/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/CompactionMetaInfo.java new file mode 100644 index 0000000000..48073687e9 --- /dev/null +++ b/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/CompactionMetaInfo.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hive.upgrade.acid; + +import java.util.HashSet; +import java.util.Set; + +public class CompactionMetaInfo { + /** + * total number of bytes to be compacted across all compaction commands + */ + private long numberOfBytes; + /** + * IDs of compactions launched by this utility + */ + private final Set compactionIds; + + public CompactionMetaInfo() { + compactionIds = new HashSet<>(); + numberOfBytes = 0; + } + + private CompactionMetaInfo(Set initialCompactionIds, long initialNumberOfBytes) { + this.compactionIds = new HashSet<>(initialCompactionIds); + numberOfBytes = initialNumberOfBytes; + } + + public CompactionMetaInfo merge(CompactionMetaInfo other) { + CompactionMetaInfo result = new CompactionMetaInfo(this.compactionIds, this.numberOfBytes); + result.numberOfBytes += other.numberOfBytes; + result.compactionIds.addAll(other.compactionIds); + return result; + } + + public long getNumberOfBytes() { + return numberOfBytes; + } + + public void addBytes(long bytes) { + numberOfBytes += bytes; + } + + public Set getCompactionIds() { + return compactionIds; + } + + public void addCompactionId(long compactionId) { + compactionIds.add(compactionId); + } +} diff --git a/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/NamedForkJoinWorkerThreadFactory.java b/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/NamedForkJoinWorkerThreadFactory.java new file mode 100644 index 0000000000..22a5dca022 --- /dev/null +++ b/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/NamedForkJoinWorkerThreadFactory.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hive.upgrade.acid; + +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinWorkerThread; + +public class NamedForkJoinWorkerThreadFactory implements ForkJoinPool.ForkJoinWorkerThreadFactory { + + NamedForkJoinWorkerThreadFactory(String namePrefix) { + this.namePrefix = namePrefix; + } + + private final String namePrefix; + + @Override + public ForkJoinWorkerThread newThread(ForkJoinPool pool) { + ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool); + worker.setName(namePrefix + worker.getName()); + return worker; + } +} diff --git a/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/PreUpgradeTool.java b/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/PreUpgradeTool.java index 0e3e3e2ed7..bf0fb05900 100644 --- a/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/PreUpgradeTool.java +++ b/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/PreUpgradeTool.java @@ -17,7 +17,20 @@ */ package org.apache.hadoop.hive.upgrade.acid; -import com.google.common.annotations.VisibleForTesting; +import static org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.escapeSQLString; + +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.nio.ByteBuffer; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ForkJoinPool; + import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.GnuParser; @@ -40,6 +53,7 @@ import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.CompactionResponse; +import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; @@ -49,10 +63,10 @@ import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.ql.io.AcidUtils; -import org.apache.hadoop.hive.ql.metadata.Hive; -import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.io.orc.OrcFile; import org.apache.hadoop.hive.ql.io.orc.Reader; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.security.AccessControlException; @@ -62,20 +76,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.FileWriter; -import java.io.IOException; -import java.io.PrintWriter; -import java.nio.ByteBuffer; -import java.nio.charset.CharacterCodingException; -import java.nio.charset.Charset; -import java.nio.charset.CharsetDecoder; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -import static org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.escapeSQLString; +import com.google.common.annotations.VisibleForTesting; /** * This utility is designed to help with upgrading Hive 2.x to Hive 3.0. On-disk layout for @@ -112,60 +113,79 @@ * * See also org.apache.hadoop.hive.ql.util.UpgradeTool in Hive 3.x */ -public class PreUpgradeTool { +public class PreUpgradeTool implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(PreUpgradeTool.class); private static final int PARTITION_BATCH_SIZE = 10000; - private final Options cmdLineOptions = new Options(); public static void main(String[] args) throws Exception { - PreUpgradeTool tool = new PreUpgradeTool(); - tool.init(); + Options cmdLineOptions = createCommandLineOptions(); CommandLineParser parser = new GnuParser(); CommandLine line ; - String outputDir = "."; - boolean execute = false; try { - line = parser.parse(tool.cmdLineOptions, args); + line = parser.parse(cmdLineOptions, args); } catch (ParseException e) { System.err.println("PreUpgradeTool: Parsing failed. Reason: " + e.getLocalizedMessage()); - printAndExit(tool); + printAndExit(cmdLineOptions); return; } if (line.hasOption("help")) { HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp("upgrade-acid", tool.cmdLineOptions); + formatter.printHelp("upgrade-acid", cmdLineOptions); return; } - if(line.hasOption("location")) { - outputDir = line.getOptionValue("location"); - } - if(line.hasOption("execute")) { - execute = true; - } - LOG.info("Starting with execute=" + execute + ", location=" + outputDir); + RunOptions runOptions = RunOptions.fromCommandLine(line); + LOG.info("Starting with " + runOptions.toString()); try { String hiveVer = HiveVersionInfo.getShortVersion(); LOG.info("Using Hive Version: " + HiveVersionInfo.getVersion() + " build: " + - HiveVersionInfo.getBuildVersion()); + HiveVersionInfo.getBuildVersion()); if(!hiveVer.startsWith("2.")) { throw new IllegalStateException("preUpgrade requires Hive 2.x. Actual: " + hiveVer); } - tool.prepareAcidUpgradeInternal(outputDir, execute); + try (PreUpgradeTool tool = new PreUpgradeTool(runOptions)) { + tool.prepareAcidUpgradeInternal(); + } } catch(Exception ex) { LOG.error("PreUpgradeTool failed", ex); throw ex; } } - private static void printAndExit(PreUpgradeTool tool) { + + private final HiveConf conf; + private final CloseableThreadLocal metaStoreClient; + private final ThreadLocal txns; + private final RunOptions runOptions; + + public PreUpgradeTool(RunOptions runOptions) { + this.runOptions = runOptions; + this.conf = hiveConf != null ? hiveConf : new HiveConf(); + this.metaStoreClient = new CloseableThreadLocal<>(this::getHMS, IMetaStoreClient::close, + runOptions.getDatabasePoolSize() + runOptions.getTablePoolSize()); + this.txns = ThreadLocal.withInitial(() -> { + /* + This API changed from 2.x to 3.0. so this won't even compile with 3.0 + but it doesn't need to since we only run this preUpgrade + */ + try { + TxnStore txnHandler = TxnUtils.getTxnStore(conf); + return TxnUtils.createValidCompactTxnList(txnHandler.getOpenTxnsInfo()); + } catch (MetaException e) { + throw new RuntimeException(e); + } + }); + } + + private static void printAndExit(Options cmdLineOptions) { HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp("upgrade-acid", tool.cmdLineOptions); + formatter.printHelp("upgrade-acid", cmdLineOptions); System.exit(1); } - private void init() { + static Options createCommandLineOptions() { try { + Options cmdLineOptions = new Options(); cmdLineOptions.addOption(new Option("help", "Generates a script to execute on 2.x" + " cluster. This requires 2.x binaries on the classpath and hive-site.xml.")); Option exec = new Option("execute", @@ -174,23 +194,46 @@ private void init() { cmdLineOptions.addOption(exec); cmdLineOptions.addOption(new Option("location", true, "Location to write scripts to. Default is CWD.")); + + Option dbRegexOption = new Option("d", "Regular expression to match database names on which this tool will be run"); + dbRegexOption.setLongOpt("dbRegex"); + dbRegexOption.setArgs(1); + cmdLineOptions.addOption(dbRegexOption); + + Option tableRegexOption = new Option("t", "Regular expression to match table names on which this tool will be run"); + tableRegexOption.setLongOpt("tableRegex"); + tableRegexOption.setArgs(1); + cmdLineOptions.addOption(tableRegexOption); + + Option databasePoolSizeOption = new Option("dn", "Number of threads to process databases."); + databasePoolSizeOption.setLongOpt("databasePoolSize"); + databasePoolSizeOption.setArgs(1); + cmdLineOptions.addOption(databasePoolSizeOption); + + Option tablePoolSizeOption = new Option("tn", "Number of threads to process tables."); + tablePoolSizeOption.setLongOpt("tablePoolSize"); + tablePoolSizeOption.setArgs(1); + cmdLineOptions.addOption(tablePoolSizeOption); + + return cmdLineOptions; } catch(Exception ex) { LOG.error("init()", ex); throw ex; } } + private static HiveMetaHookLoader getHookLoader() { return new HiveMetaHookLoader() { @Override public HiveMetaHook getHook( - org.apache.hadoop.hive.metastore.api.Table tbl) { + org.apache.hadoop.hive.metastore.api.Table tbl) { return null; } }; } - private static IMetaStoreClient getHMS(HiveConf conf) { + public IMetaStoreClient getHMS() { UserGroupInformation loggedInUser = null; try { loggedInUser = UserGroupInformation.getLoginUser(); @@ -207,84 +250,57 @@ private static IMetaStoreClient getHMS(HiveConf conf) { which calls HiveMetaStoreClient(HiveConf, Boolean) which exists in (at least) 2.1.0.2.6.5.0-292 and later but not in 2.1.0.2.6.0.3-8 (the HDP 2.6 release) i.e. RetryingMetaStoreClient.getProxy(conf, true) is broken in 2.6.0*/ - return RetryingMetaStoreClient.getProxy(conf, - new Class[]{HiveConf.class, HiveMetaHookLoader.class, Boolean.class}, - new Object[]{conf, getHookLoader(), Boolean.TRUE}, null, HiveMetaStoreClient.class.getName()); - } catch (MetaException e) { + IMetaStoreClient client = RetryingMetaStoreClient.getProxy(conf, + new Class[]{HiveConf.class, HiveMetaHookLoader.class, Boolean.class}, + new Object[]{conf, getHookLoader(), Boolean.TRUE}, null, HiveMetaStoreClient.class.getName()); + if (hiveConf != null) { + SessionState ss = SessionState.start(conf); + ss.applyAuthorizationPolicy(); + } + return client; + } catch (MetaException | HiveException e) { throw new RuntimeException("Error connecting to Hive Metastore URI: " - + conf.getVar(HiveConf.ConfVars.METASTOREURIS) + ". " + e.getMessage(), e); + + conf.getVar(HiveConf.ConfVars.METASTOREURIS) + ". " + e.getMessage(), e); } } /** * todo: change script comments to a preamble instead of a footer */ - private void prepareAcidUpgradeInternal(String scriptLocation, boolean execute) + private void prepareAcidUpgradeInternal() throws HiveException, TException, IOException { - HiveConf conf = hiveConf != null ? hiveConf : new HiveConf(); - boolean isAcidEnabled = isAcidEnabled(conf); - IMetaStoreClient hms = getHMS(conf); + if (!isAcidEnabled(conf)) { + LOG.info("acid is off, there can't be any acid tables - nothing to compact"); + return; + } + IMetaStoreClient hms = metaStoreClient.get(); LOG.debug("Looking for databases"); String exceptionMsg = null; List databases; - List compactions = new ArrayList<>(); - final CompactionMetaInfo compactionMetaInfo = new CompactionMetaInfo(); - ValidTxnList txns = null; - Hive db = null; + CompactTablesState compactTablesState; try { - databases = hms.getAllDatabases();//TException + databases = hms.getDatabases(runOptions.getDbRegex());//TException LOG.debug("Found " + databases.size() + " databases to process"); - if (execute) { - db = Hive.get(conf); - } - for (String dbName : databases) { - try { - List tables = hms.getAllTables(dbName); - LOG.debug("found " + tables.size() + " tables in " + dbName); - for (String tableName : tables) { - try { - Table t = hms.getTable(dbName, tableName); - LOG.debug("processing table " + Warehouse.getQualifiedName(t)); - if (isAcidEnabled) { - //if acid is off, there can't be any acid tables - nothing to compact - if (txns == null) { - /* - This API changed from 2.x to 3.0. so this won't even compile with 3.0 - but it doesn't need to since we only run this preUpgrade - */ - TxnStore txnHandler = TxnUtils.getTxnStore(conf); - txns = TxnUtils.createValidCompactTxnList(txnHandler.getOpenTxnsInfo()); - } - List compactionCommands = - getCompactionCommands(t, conf, hms, compactionMetaInfo, execute, db, txns); - compactions.addAll(compactionCommands); - } - /*todo: handle renaming files somewhere*/ - } catch (Exception e) { - if (isAccessControlException(e)) { - // this could be external table with 0 permission for hive user - exceptionMsg = "Unable to access " + dbName + "." + tableName + ". Pre-upgrade tool requires read-access " + - "to databases and tables to determine if a table has to be compacted. " + - "Set " + HiveConf.ConfVars.HIVE_METASTORE_AUTHORIZATION_AUTH_READS.varname + " config to " + - "false to allow read-access to databases and tables and retry the pre-upgrade tool again.."; - } - throw e; - } - } - } catch (Exception e) { - if (exceptionMsg == null && isAccessControlException(e)) { - // we may not have access to read all tables from this db - exceptionMsg = "Unable to access " + dbName + ". Pre-upgrade tool requires read-access " + - "to databases and tables to determine if a table has to be compacted. " + - "Set " + HiveConf.ConfVars.HIVE_METASTORE_AUTHORIZATION_AUTH_READS.varname + " config to " + - "false to allow read-access to databases and tables and retry the pre-upgrade tool again.."; - } - throw e; - } - } + ForkJoinPool processDatabasePool = new ForkJoinPool( + runOptions.getDatabasePoolSize(), + new NamedForkJoinWorkerThreadFactory("Database-"), + null, + false); + ForkJoinPool processTablePool = new ForkJoinPool( + runOptions.getTablePoolSize(), + new NamedForkJoinWorkerThreadFactory("Table-"), + null, + false + ); + compactTablesState = processDatabasePool.submit( + () -> databases.parallelStream() + .map(dbName -> processDatabase(dbName, processTablePool, runOptions)) + .reduce(CompactTablesState::merge)).get() + .orElse(CompactTablesState.empty()); + } catch (Exception e) { - if (exceptionMsg == null && isAccessControlException(e)) { + if (isAccessControlException(e)) { exceptionMsg = "Unable to get databases. Pre-upgrade tool requires read-access " + "to databases and tables to determine if a table has to be compacted. " + "Set " + HiveConf.ConfVars.HIVE_METASTORE_AUTHORIZATION_AUTH_READS.varname + " config to " + @@ -293,27 +309,27 @@ private void prepareAcidUpgradeInternal(String scriptLocation, boolean execute) throw new HiveException(exceptionMsg, e); } - makeCompactionScript(compactions, scriptLocation, compactionMetaInfo); + makeCompactionScript(compactTablesState, runOptions.getOutputDir()); - if(execute) { - while(compactionMetaInfo.compactionIds.size() > 0) { - LOG.debug("Will wait for " + compactionMetaInfo.compactionIds.size() + + if(runOptions.isExecute()) { + while(compactTablesState.getMetaInfo().getCompactionIds().size() > 0) { + LOG.debug("Will wait for " + compactTablesState.getMetaInfo().getCompactionIds().size() + " compactions to complete"); - ShowCompactResponse resp = db.showCompactions(); + ShowCompactResponse resp = hms.showCompactions(); for(ShowCompactResponseElement e : resp.getCompacts()) { final String state = e.getState(); boolean removed; switch (state) { case TxnStore.CLEANING_RESPONSE: case TxnStore.SUCCEEDED_RESPONSE: - removed = compactionMetaInfo.compactionIds.remove(e.getId()); + removed = compactTablesState.getMetaInfo().getCompactionIds().remove(e.getId()); if(removed) { LOG.debug("Required compaction succeeded: " + e.toString()); } break; case TxnStore.ATTEMPTED_RESPONSE: case TxnStore.FAILED_RESPONSE: - removed = compactionMetaInfo.compactionIds.remove(e.getId()); + removed = compactTablesState.getMetaInfo().getCompactionIds().remove(e.getId()); if(removed) { LOG.warn("Required compaction failed: " + e.toString()); } @@ -329,20 +345,69 @@ private void prepareAcidUpgradeInternal(String scriptLocation, boolean execute) LOG.error("Unexpected state for : " + e.toString()); } } - if(compactionMetaInfo.compactionIds.size() > 0) { + if(compactTablesState.getMetaInfo().getCompactionIds().size() > 0) { try { if (callback != null) { callback.onWaitForCompaction(); } Thread.sleep(pollIntervalMs); } catch (InterruptedException ex) { - ;//this only responds to ^C + //this only responds to ^C } } } } } + private CompactTablesState processDatabase( + String dbName, ForkJoinPool threadPool, RunOptions runOptions) { + try { + IMetaStoreClient hms = metaStoreClient.get(); + + List tables = hms.getTables(dbName, runOptions.getTableRegex()); + LOG.debug("found " + tables.size() + " tables in " + dbName); + + return threadPool.submit( + () -> tables.parallelStream() + .map(table -> processTable(dbName, table, runOptions)) + .reduce(CompactTablesState::merge)).get() + .orElse(CompactTablesState.empty()); + } catch (Exception e) { + if (isAccessControlException(e)) { + // we may not have access to read all tables from this db + throw new RuntimeException("Unable to access " + dbName + ". Pre-upgrade tool requires read-access " + + "to databases and tables to determine if a table has to be compacted. " + + "Set " + HiveConf.ConfVars.HIVE_METASTORE_AUTHORIZATION_AUTH_READS.varname + " config to " + + "false to allow read-access to databases and tables and retry the pre-upgrade tool again..", e); + } + throw new RuntimeException(e); + } + } + + private CompactTablesState processTable( + String dbName, String tableName, RunOptions runOptions) { + try { + IMetaStoreClient hms = metaStoreClient.get(); + final CompactionMetaInfo compactionMetaInfo = new CompactionMetaInfo(); + + Table t = hms.getTable(dbName, tableName); + LOG.debug("processing table " + Warehouse.getQualifiedName(t)); + List compactionCommands = + getCompactionCommands(t, conf, hms, compactionMetaInfo, runOptions.isExecute(), txns.get()); + return CompactTablesState.compactions(compactionCommands, compactionMetaInfo); + /*todo: handle renaming files somewhere*/ + } catch (Exception e) { + if (isAccessControlException(e)) { + // this could be external table with 0 permission for hive user + throw new RuntimeException("Unable to access " + dbName + "." + tableName + ". Pre-upgrade tool requires read-access " + + "to databases and tables to determine if a table has to be compacted. " + + "Set " + HiveConf.ConfVars.HIVE_METASTORE_AUTHORIZATION_AUTH_READS.varname + " config to " + + "false to allow read-access to databases and tables and retry the pre-upgrade tool again..", e); + } + throw new RuntimeException(e); + } + } + private boolean isAccessControlException(final Exception e) { // hadoop security AccessControlException if ((e instanceof MetaException && e.getCause() instanceof AccessControlException) || @@ -363,25 +428,25 @@ private boolean isAccessControlException(final Exception e) { /** * Generates a set compaction commands to run on pre Hive 3 cluster */ - private static void makeCompactionScript(List commands, String scriptLocation, - CompactionMetaInfo compactionMetaInfo) throws IOException { - if (commands.isEmpty()) { + private static void makeCompactionScript(CompactTablesState result, String scriptLocation) throws IOException { + if (result.getCompactionCommands().isEmpty()) { LOG.info("No compaction is necessary"); return; } String fileName = "compacts_" + System.currentTimeMillis() + ".sql"; LOG.debug("Writing compaction commands to " + fileName); - try(PrintWriter pw = createScript(commands, fileName, scriptLocation)) { + try(PrintWriter pw = createScript( + result.getCompactionCommands(), fileName, scriptLocation)) { //add post script - pw.println("-- Generated total of " + commands.size() + " compaction commands"); - if(compactionMetaInfo.numberOfBytes < Math.pow(2, 20)) { + pw.println("-- Generated total of " + result.getCompactionCommands().size() + " compaction commands"); + if(result.getMetaInfo().getNumberOfBytes() < Math.pow(2, 20)) { //to see it working in UTs pw.println("-- The total volume of data to be compacted is " + - String.format("%.6fMB", compactionMetaInfo.numberOfBytes/Math.pow(2, 20))); + String.format("%.6fMB", result.getMetaInfo().getNumberOfBytes()/Math.pow(2, 20))); } else { pw.println("-- The total volume of data to be compacted is " + - String.format("%.3fGB", compactionMetaInfo.numberOfBytes/Math.pow(2, 30))); + String.format("%.3fGB", result.getMetaInfo().getNumberOfBytes()/Math.pow(2, 30))); } pw.println(); //todo: should be at the top of the file... @@ -410,7 +475,7 @@ private static PrintWriter createScript(List commands, String fileName, * @return any compaction commands to run for {@code Table t} */ private static List getCompactionCommands(Table t, HiveConf conf, - IMetaStoreClient hms, CompactionMetaInfo compactionMetaInfo, boolean execute, Hive db, + IMetaStoreClient hms, CompactionMetaInfo compactionMetaInfo, boolean execute, ValidTxnList txns) throws IOException, TException, HiveException { if(!isFullAcidTable(t)) { return Collections.emptyList(); @@ -424,7 +489,7 @@ private static PrintWriter createScript(List commands, String fileName, List cmds = new ArrayList<>(); cmds.add(getCompactionCommand(t, null)); if(execute) { - scheduleCompaction(t, null, db, compactionMetaInfo); + scheduleCompaction(t, null, hms, compactionMetaInfo); } return cmds; } @@ -435,19 +500,19 @@ private static PrintWriter createScript(List commands, String fileName, for(int i = 0; i < numWholeBatches; i++) { List partitionList = hms.getPartitionsByNames(t.getDbName(), t.getTableName(), partNames.subList(i * batchSize, (i + 1) * batchSize)); - getCompactionCommands(t, partitionList, db, execute, compactionCommands, + getCompactionCommands(t, partitionList, hms, execute, compactionCommands, compactionMetaInfo, conf, txns); } if(numWholeBatches * batchSize < partNames.size()) { //last partial batch List partitionList = hms.getPartitionsByNames(t.getDbName(), t.getTableName(), partNames.subList(numWholeBatches * batchSize, partNames.size())); - getCompactionCommands(t, partitionList, db, execute, compactionCommands, + getCompactionCommands(t, partitionList, hms, execute, compactionCommands, compactionMetaInfo, conf, txns); } return compactionCommands; } - private static void getCompactionCommands(Table t, List partitionList, Hive db, + private static void getCompactionCommands(Table t, List partitionList, IMetaStoreClient hms, boolean execute, List compactionCommands, CompactionMetaInfo compactionMetaInfo, HiveConf conf, ValidTxnList txns) throws IOException, TException, HiveException { @@ -455,28 +520,31 @@ private static void getCompactionCommands(Table t, List partitionList if (needsCompaction(new Path(p.getSd().getLocation()), conf, compactionMetaInfo, txns)) { compactionCommands.add(getCompactionCommand(t, p)); if (execute) { - scheduleCompaction(t, p, db, compactionMetaInfo); + scheduleCompaction(t, p, hms, compactionMetaInfo); } } } } - private static void scheduleCompaction(Table t, Partition p, Hive db, + private static void scheduleCompaction(Table t, Partition p, IMetaStoreClient db, CompactionMetaInfo compactionMetaInfo) throws HiveException, MetaException { String partName = p == null ? null : Warehouse.makePartName(t.getPartitionKeys(), p.getValues()); - CompactionResponse resp = - //this gives an easy way to get at compaction ID so we can only wait for those this - //utility started - db.compact2(t.getDbName(), t.getTableName(), partName, "major", null); - if(!resp.isAccepted()) { - LOG.info(Warehouse.getQualifiedName(t) + (p == null ? "" : "/" + partName) + - " is already being compacted with id=" + resp.getId()); - } - else { - LOG.info("Scheduled compaction for " + Warehouse.getQualifiedName(t) + - (p == null ? "" : "/" + partName) + " with id=" + resp.getId()); - } - compactionMetaInfo.compactionIds.add(resp.getId()); + try { + CompactionResponse resp = + //this gives an easy way to get at compaction ID so we can only wait for those this + //utility started + db.compact2(t.getDbName(), t.getTableName(), partName, CompactionType.MAJOR, null); + if (!resp.isAccepted()) { + LOG.info(Warehouse.getQualifiedName(t) + (p == null ? "" : "/" + partName) + + " is already being compacted with id=" + resp.getId()); + } else { + LOG.info("Scheduled compaction for " + Warehouse.getQualifiedName(t) + + (p == null ? "" : "/" + partName) + " with id=" + resp.getId()); + } + compactionMetaInfo.addCompactionId(resp.getId()); + } catch (TException e) { + throw new HiveException(e); + } } /** @@ -519,14 +587,14 @@ public boolean accept(Path path) { } if(needsCompaction(bucket, fs)) { //found delete events - this 'location' needs compacting - compactionMetaInfo.numberOfBytes += getDataSize(location, conf); + compactionMetaInfo.addBytes(getDataSize(location, conf)); //if there are un-compacted original files, they will be included in compaction, so //count at the size for 'cost' estimation later for(HadoopShims.HdfsFileStatusWithId origFile : dir.getOriginalFiles()) { FileStatus fileStatus = origFile.getFileStatus(); if(fileStatus != null) { - compactionMetaInfo.numberOfBytes += fileStatus.getLen(); + compactionMetaInfo.addBytes(fileStatus.getLen()); } } return true; @@ -636,15 +704,9 @@ private static boolean isAcidEnabled(HiveConf hiveConf) { return txnMgr.equals(dbTxnMgr) && concurrency; } - private static class CompactionMetaInfo { - /** - * total number of bytes to be compacted across all compaction commands - */ - long numberOfBytes; - /** - * IDs of compactions launched by this utility - */ - Set compactionIds = new HashSet<>(); + @Override + public void close() { + metaStoreClient.close(); } @VisibleForTesting diff --git a/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/RunOptions.java b/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/RunOptions.java new file mode 100644 index 0000000000..e61acf1cde --- /dev/null +++ b/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/RunOptions.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hive.upgrade.acid; + +import org.apache.commons.cli.CommandLine; + +public class RunOptions { + + public static RunOptions fromCommandLine(CommandLine commandLine) { + int defaultPoolSize = Runtime.getRuntime().availableProcessors() / 2; + if (defaultPoolSize < 1) + defaultPoolSize = 1; + + int databasePoolSize = getIntOptionValue(commandLine, "databasePoolSize", defaultPoolSize); + if (databasePoolSize < 1) + throw new IllegalArgumentException("Please specify a positive integer option value for databasePoolSize"); + + int tablePoolSize = getIntOptionValue(commandLine, "tablePoolSize", defaultPoolSize); + if (tablePoolSize < 1) + throw new IllegalArgumentException("Please specify a positive integer option value for tablePoolSize"); + + return new RunOptions( + commandLine.getOptionValue("location", "."), + commandLine.hasOption("execute"), + commandLine.getOptionValue("dbRegex", ".*"), + commandLine.getOptionValue("tableRegex", ".*"), + databasePoolSize, + tablePoolSize); + } + + private static int getIntOptionValue(CommandLine commandLine, String optionName, int defaultValue) { + if (commandLine.hasOption(optionName)) { + try { + return Integer.parseInt(commandLine.getOptionValue(optionName)); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Please specify a positive integer option value for " + optionName, e); + } + } + return defaultValue; + } + + private final String outputDir; + private final boolean execute; + private final String dbRegex; + private final String tableRegex; + private final int databasePoolSize; + private final int tablePoolSize; + + private RunOptions(String outputDir, boolean execute, String dbRegex, String tableRegex, int databasePoolSize, int tablePoolSize) { + this.outputDir = outputDir; + this.execute = execute; + this.dbRegex = dbRegex; + this.tableRegex = tableRegex; + this.databasePoolSize = databasePoolSize; + this.tablePoolSize = tablePoolSize; + } + + public String getOutputDir() { + return outputDir; + } + + public boolean isExecute() { + return execute; + } + + public String getDbRegex() { + return dbRegex; + } + + public String getTableRegex() { + return tableRegex; + } + + public int getDatabasePoolSize() { + return databasePoolSize; + } + + public int getTablePoolSize() { + return tablePoolSize; + } + + @Override + public String toString() { + return "RunOptions{" + + "outputDir='" + outputDir + '\'' + + ", execute=" + execute + + ", dbRegex='" + dbRegex + '\'' + + ", tableRegex='" + tableRegex + '\'' + + ", databasePoolSize=" + databasePoolSize + + ", tablePoolSize=" + tablePoolSize + + '}'; + } +} diff --git a/upgrade-acid/pre-upgrade/src/test/java/org/apache/hadoop/hive/upgrade/acid/CloseableThreadLocalTest.java b/upgrade-acid/pre-upgrade/src/test/java/org/apache/hadoop/hive/upgrade/acid/CloseableThreadLocalTest.java new file mode 100644 index 0000000000..562bb345f3 --- /dev/null +++ b/upgrade-acid/pre-upgrade/src/test/java/org/apache/hadoop/hive/upgrade/acid/CloseableThreadLocalTest.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hive.upgrade.acid; + +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import org.junit.Test; + +public class CloseableThreadLocalTest { + + private static class AutoCloseableStub implements AutoCloseable { + + private boolean closed = false; + + public boolean isClosed() { + return closed; + } + + @Override + public void close() { + closed = true; + } + } + + @Test + public void testResourcesAreInitiallyNotClosed() { + CloseableThreadLocal closeableThreadLocal = + new CloseableThreadLocal<>(AutoCloseableStub::new, AutoCloseableStub::close,1); + + assertThat(closeableThreadLocal.get().isClosed(), is(false)); + } + + @Test + public void testAfterCallingCloseAllInstancesAreClosed() throws ExecutionException, InterruptedException { + CloseableThreadLocal closeableThreadLocal = + new CloseableThreadLocal<>(AutoCloseableStub::new, AutoCloseableStub::close, 2); + + AutoCloseableStub asyncInstance = CompletableFuture.supplyAsync(closeableThreadLocal::get).get(); + AutoCloseableStub syncInstance = closeableThreadLocal.get(); + + closeableThreadLocal.close(); + + assertThat(asyncInstance.isClosed(), is(true)); + assertThat(syncInstance.isClosed(), is(true)); + } + + @Test + public void testSubsequentGetsInTheSameThreadGivesBackTheSameObject() { + CloseableThreadLocal closeableThreadLocal = + new CloseableThreadLocal<>(AutoCloseableStub::new, AutoCloseableStub::close, 2); + + AutoCloseableStub ref1 = closeableThreadLocal.get(); + AutoCloseableStub ref2 = closeableThreadLocal.get(); + assertThat(ref1, is(ref2)); + } + + @Test + public void testDifferentThreadsHasDifferentInstancesOfTheResource() throws ExecutionException, InterruptedException { + CloseableThreadLocal closeableThreadLocal = + new CloseableThreadLocal<>(AutoCloseableStub::new, AutoCloseableStub::close, 2); + + AutoCloseableStub asyncInstance = CompletableFuture.supplyAsync(closeableThreadLocal::get).get(); + AutoCloseableStub syncInstance = closeableThreadLocal.get(); + assertThat(asyncInstance, is(not(syncInstance))); + } +} \ No newline at end of file diff --git a/upgrade-acid/pre-upgrade/src/test/java/org/apache/hadoop/hive/upgrade/acid/RunOptionsTest.java b/upgrade-acid/pre-upgrade/src/test/java/org/apache/hadoop/hive/upgrade/acid/RunOptionsTest.java new file mode 100644 index 0000000000..3e22caec25 --- /dev/null +++ b/upgrade-acid/pre-upgrade/src/test/java/org/apache/hadoop/hive/upgrade/acid/RunOptionsTest.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hive.upgrade.acid; + +import static org.apache.hadoop.hive.upgrade.acid.PreUpgradeTool.createCommandLineOptions; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; + +import org.apache.commons.cli.GnuParser; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +public class RunOptionsTest { + + @Rule + public ExpectedException expectedEx = ExpectedException.none(); + + @Test + public void testDatabasePoolSizeIs10WhenSpecified() throws Exception { + String[] args = {"-databasePoolSize", "10"}; + RunOptions runOptions = RunOptions.fromCommandLine(new GnuParser().parse(createCommandLineOptions(), args)); + assertThat(runOptions.getDatabasePoolSize(), is(10)); + } + + @Test + public void testExceptionIsThrownWhenDatabasePoolSizeIsNotANumber() throws Exception { + expectedEx.expect(IllegalArgumentException.class); + expectedEx.expectMessage("Please specify a positive integer option value for databasePoolSize"); + + String[] args = {"-databasePoolSize", "notANumber"}; + RunOptions.fromCommandLine(new GnuParser().parse(createCommandLineOptions(), args)); + } + + @Test + public void testExceptionIsThrownWhenDatabasePoolSizeIsLessThan1() throws Exception { + expectedEx.expect(IllegalArgumentException.class); + expectedEx.expectMessage("Please specify a positive integer option value for databasePoolSize"); + + String[] args = {"-databasePoolSize", "0"}; + RunOptions.fromCommandLine(new GnuParser().parse(createCommandLineOptions(), args)); + } + + @Test + public void testExceptionIsThrownWhenDatabasePoolSizeIsNotInteger() throws Exception { + expectedEx.expect(IllegalArgumentException.class); + expectedEx.expectMessage("Please specify a positive integer option value for databasePoolSize"); + + String[] args = {"-databasePoolSize", "0.5"}; + RunOptions.fromCommandLine(new GnuParser().parse(createCommandLineOptions(), args)); + } + + @Test + public void testTablePoolSizeIs5WhenSpecified() throws Exception { + String[] args = {"-tablePoolSize", "5"}; + RunOptions runOptions = RunOptions.fromCommandLine(new GnuParser().parse(createCommandLineOptions(), args)); + assertThat(runOptions.getTablePoolSize(), is(5)); + } + + @Test + public void testExceptionIsThrownWhenTablePoolSizeIsNotANumber() throws Exception { + expectedEx.expect(IllegalArgumentException.class); + expectedEx.expectMessage("Please specify a positive integer option value for tablePoolSize"); + + String[] args = {"-tablePoolSize", "notANumber"}; + RunOptions.fromCommandLine(new GnuParser().parse(createCommandLineOptions(), args)); + } + + @Test + public void testExceptionIsThrownWhenTablePoolSizeIsLessThan1() throws Exception { + expectedEx.expect(IllegalArgumentException.class); + expectedEx.expectMessage("Please specify a positive integer option value for tablePoolSize"); + + String[] args = {"-tablePoolSize", "0"}; + RunOptions.fromCommandLine(new GnuParser().parse(createCommandLineOptions(), args)); + } + + @Test + public void testExceptionIsThrownWhenTablePoolSizeIsNotInteger() throws Exception { + expectedEx.expect(IllegalArgumentException.class); + expectedEx.expectMessage("Please specify a positive integer option value for tablePoolSize"); + + String[] args = {"-tablePoolSize", "0.5"}; + RunOptions.fromCommandLine(new GnuParser().parse(createCommandLineOptions(), args)); + } +} \ No newline at end of file diff --git a/upgrade-acid/pre-upgrade/src/test/java/org/apache/hadoop/hive/upgrade/acid/TestPreUpgradeTool.java b/upgrade-acid/pre-upgrade/src/test/java/org/apache/hadoop/hive/upgrade/acid/TestPreUpgradeTool.java index 90230d5dbd..e0fbdac80a 100644 --- a/upgrade-acid/pre-upgrade/src/test/java/org/apache/hadoop/hive/upgrade/acid/TestPreUpgradeTool.java +++ b/upgrade-acid/pre-upgrade/src/test/java/org/apache/hadoop/hive/upgrade/acid/TestPreUpgradeTool.java @@ -17,6 +17,25 @@ */ package org.apache.hadoop.hive.upgrade.acid; +import static org.hamcrest.CoreMatchers.allOf; +import static org.hamcrest.CoreMatchers.hasItem; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.StringContains.containsString; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.attribute.PosixFilePermission; +import java.nio.file.attribute.PosixFilePermissions; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; @@ -30,12 +49,8 @@ import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.QueryState; -import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; -import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.HiveInputFormat; -import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.txn.compactor.Worker; @@ -46,16 +61,6 @@ import org.junit.Test; import org.junit.rules.TestName; -import java.io.File; -import java.nio.file.Files; -import java.nio.file.attribute.FileAttribute; -import java.nio.file.attribute.PosixFilePermission; -import java.nio.file.attribute.PosixFilePermissions; -import java.util.ArrayList; -import java.util.List; -import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; - public class TestPreUpgradeTool { private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") + File.separator + TestPreUpgradeTool.class.getCanonicalName() + "-" + System.currentTimeMillis() @@ -117,13 +122,14 @@ void onWaitForCompaction() throws MetaException { PreUpgradeTool.pollIntervalMs = 1; PreUpgradeTool.hiveConf = hiveConf; PreUpgradeTool.main(args); - /* - todo: parse - target/tmp/org.apache.hadoop.hive.upgrade.acid.TestPreUpgradeTool-1527286256834/compacts_1527286277624.sql - make sure it's the only 'compacts' file and contains - ALTER TABLE default.tacid COMPACT 'major'; -ALTER TABLE default.tacidpart PARTITION(p=10Y) COMPACT 'major'; - * */ + + String[] scriptFiles = getScriptFiles(); + assertThat(scriptFiles.length, is(1)); + + List scriptContent = loadScriptContent(new File(getTestDataDir(), scriptFiles[0])); + assertThat(scriptContent.size(), is(2)); + assertThat(scriptContent, hasItem(is("ALTER TABLE default.tacid COMPACT 'major';"))); + assertThat(scriptContent, hasItem(is("ALTER TABLE default.tacidpart PARTITION(p=10Y) COMPACT 'major';"))); TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); @@ -133,11 +139,19 @@ void onWaitForCompaction() throws MetaException { Assert.assertEquals(e.toString(), TxnStore.CLEANING_RESPONSE, e.getState()); } - String[] args2 = {"-location", getTestDataDir()}; + // Check whether compaction was successful in the first run + File secondRunDataDir = new File(getTestDataDir(), "secondRun"); + if (!secondRunDataDir.exists()) { + if (!secondRunDataDir.mkdir()) + throw new IOException("Unable to create directory" + secondRunDataDir.getAbsolutePath()); + } + String[] args2 = {"-location", secondRunDataDir.getAbsolutePath()}; PreUpgradeTool.main(args2); - /* - * todo: parse compacts script - make sure there is nothing in it - * */ + + scriptFiles = secondRunDataDir.list(); + assertThat(scriptFiles, is(not(nullValue()))); + assertThat(scriptFiles.length, is(0)); + } finally { runStatementOnDriver("drop table if exists TAcid"); runStatementOnDriver("drop table if exists TAcidPart"); @@ -146,6 +160,113 @@ void onWaitForCompaction() throws MetaException { } } + private static final String INCLUDE_DATABASE_NAME ="DInclude"; + private static final String EXCLUDE_DATABASE_NAME ="DExclude"; + + @Test + public void testOnlyFilteredDatabasesAreUpgradedWhenRegexIsGiven() throws Exception { + int[][] data = {{1,2}, {3, 4}, {5, 6}}; + runStatementOnDriver("drop database if exists " + INCLUDE_DATABASE_NAME + " cascade"); + runStatementOnDriver("drop database if exists " + EXCLUDE_DATABASE_NAME + " cascade"); + + try { + runStatementOnDriver("create database " + INCLUDE_DATABASE_NAME); + runStatementOnDriver("use " + INCLUDE_DATABASE_NAME); + runStatementOnDriver( + "create table " + INCLUDE_TABLE_NAME + " (a int, b int) clustered by (b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + runStatementOnDriver("insert into " + INCLUDE_TABLE_NAME + makeValuesClause(data)); + runStatementOnDriver("update " + INCLUDE_TABLE_NAME + " set a = 1 where b = 2"); + + runStatementOnDriver("create database " + EXCLUDE_DATABASE_NAME); + runStatementOnDriver("use " + EXCLUDE_DATABASE_NAME); + runStatementOnDriver( + "create table " + EXCLUDE_DATABASE_NAME + " (a int, b int) clustered by (b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + runStatementOnDriver("insert into " + EXCLUDE_DATABASE_NAME + makeValuesClause(data)); + runStatementOnDriver("update " + EXCLUDE_DATABASE_NAME + " set a = 1 where b = 2"); + + String[] args = {"-location", getTestDataDir(), "-dbRegex", "*include*"}; + PreUpgradeTool.callback = new PreUpgradeTool.Callback() { + @Override + void onWaitForCompaction() throws MetaException { + runWorker(hiveConf); + } + }; + PreUpgradeTool.pollIntervalMs = 1; + PreUpgradeTool.hiveConf = hiveConf; + PreUpgradeTool.main(args); + + String[] scriptFiles = getScriptFiles(); + assertThat(scriptFiles.length, is(1)); + + List scriptContent = loadScriptContent(new File(getTestDataDir(), scriptFiles[0])); + assertThat(scriptContent.size(), is(1)); + assertThat(scriptContent.get(0), is("ALTER TABLE dinclude.tinclude COMPACT 'major';")); + + } finally { + runStatementOnDriver("drop database if exists " + INCLUDE_DATABASE_NAME + " cascade"); + runStatementOnDriver("drop database if exists " + EXCLUDE_DATABASE_NAME + " cascade"); + } + } + + private static final String INCLUDE_TABLE_NAME ="TInclude"; + private static final String EXCLUDE_TABLE_NAME ="TExclude"; + + @Test + public void testOnlyFilteredTablesAreUpgradedWhenRegexIsGiven() throws Exception { + int[][] data = {{1,2}, {3, 4}, {5, 6}}; + runStatementOnDriver("drop table if exists " + INCLUDE_TABLE_NAME); + runStatementOnDriver("drop table if exists " + EXCLUDE_TABLE_NAME); + + try { + runStatementOnDriver( + "create table " + INCLUDE_TABLE_NAME + " (a int, b int) clustered by (b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + runStatementOnDriver( + "create table " + EXCLUDE_TABLE_NAME + " (a int, b int) clustered by (b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + + runStatementOnDriver("insert into " + INCLUDE_TABLE_NAME + makeValuesClause(data)); + runStatementOnDriver("update " + INCLUDE_TABLE_NAME + " set a = 1 where b = 2"); + + runStatementOnDriver("insert into " + EXCLUDE_TABLE_NAME + makeValuesClause(data)); + runStatementOnDriver("update " + EXCLUDE_TABLE_NAME + " set a = 1 where b = 2"); + + String[] args = {"-location", getTestDataDir(), "-tableRegex", "*include*"}; + PreUpgradeTool.callback = new PreUpgradeTool.Callback() { + @Override + void onWaitForCompaction() throws MetaException { + runWorker(hiveConf); + } + }; + PreUpgradeTool.pollIntervalMs = 1; + PreUpgradeTool.hiveConf = hiveConf; + PreUpgradeTool.main(args); + + String[] scriptFiles = getScriptFiles(); + assertThat(scriptFiles.length, is(1)); + + List scriptContent = loadScriptContent(new File(getTestDataDir(), scriptFiles[0])); + assertThat(scriptContent.size(), is(1)); + assertThat(scriptContent.get(0), allOf(containsString("ALTER TABLE"), containsString(INCLUDE_TABLE_NAME.toLowerCase()), containsString("COMPACT"))); + + } finally { + runStatementOnDriver("drop table if exists " + INCLUDE_TABLE_NAME); + runStatementOnDriver("drop table if exists " + EXCLUDE_TABLE_NAME); + } + } + + private String[] getScriptFiles() { + File testDataDir = new File(getTestDataDir()); + String[] scriptFiles = testDataDir.list((dir, name) -> name.startsWith("compacts_") && name.endsWith(".sql")); + assertThat(scriptFiles, is(not(nullValue()))); + return scriptFiles; + } + + private List loadScriptContent(File file) throws IOException { + List content = org.apache.commons.io.FileUtils.readLines(file); + content.removeIf(line -> line.startsWith("--")); + content.removeIf(StringUtils::isBlank); + return content; + } + @Test public void testUpgradeExternalTableNoReadPermissionForDatabase() throws Exception { int[][] data = {{1,2}, {3, 4}, {5, 6}};