diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 7818efbbf5..cd47a63e14 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -296,7 +296,49 @@ public static Path createFilename(Path directory, } return createBucketFile(new Path(directory, subdir), options.getBucketId()); } - + /** + * Represents bucketId and copy_N suffix + */ + public static final class BucketMetaData { + private static final BucketMetaData INVALID = new BucketMetaData(-1, 0); + /** + * @param bucketFileName {@link #ORIGINAL_PATTERN} or {@link #ORIGINAL_PATTERN_COPY} + */ + public static BucketMetaData parse(String bucketFileName) { + if (ORIGINAL_PATTERN.matcher(bucketFileName).matches()) { + int bucketId = Integer + .parseInt(bucketFileName.substring(0, bucketFileName.indexOf('_'))); + return new BucketMetaData(bucketId, 0); + } + else if(ORIGINAL_PATTERN_COPY.matcher(bucketFileName).matches()) { + int copyNumber = Integer.parseInt( + bucketFileName.substring(bucketFileName.lastIndexOf('_') + 1)); + int bucketId = Integer + .parseInt(bucketFileName.substring(0, bucketFileName.indexOf('_'))); + return new BucketMetaData(bucketId, copyNumber); + } + else if (bucketFileName.startsWith(BUCKET_PREFIX)) { + return new BucketMetaData(Integer + .parseInt(bucketFileName.substring(bucketFileName.indexOf('_') + 1)), 0); + } + return INVALID; + } + public static BucketMetaData parse(Path bucketFile) { + return parse(bucketFile.getName()); + } + /** + * -1 if non-standard file name + */ + public final int bucketId; + /** + * 0 means no copy_N suffix + */ + public final int copyNumber; + private BucketMetaData(int bucketId, int copyNumber) { + this.bucketId = bucketId; + this.copyNumber = copyNumber; + } + } /** * Get the write id from a base directory name. * @param path the base directory name diff --git ql/src/java/org/apache/hadoop/hive/ql/io/BucketCodec.java ql/src/java/org/apache/hadoop/hive/ql/io/BucketCodec.java index dfd4452ad4..bdd16c532d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/BucketCodec.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/BucketCodec.java @@ -105,7 +105,7 @@ public int encode(AcidOutputFormat.Options options) { private static final int NUM_BUCKET_ID_BITS = 12; private static final int NUM_STATEMENT_ID_BITS = 12; private static final int MAX_VERSION = (1 << NUM_VERSION_BITS) - 1; - private static final int MAX_BUCKET_ID = (1 << NUM_BUCKET_ID_BITS) - 1; + public static final int MAX_BUCKET_ID = (1 << NUM_BUCKET_ID_BITS) - 1; private static final int MAX_STATEMENT_ID = (1 << NUM_STATEMENT_ID_BITS) - 1; public static BucketCodec determineVersion(int bucket) { diff --git ql/src/java/org/apache/hadoop/hive/ql/util/UpgradeTool.java ql/src/java/org/apache/hadoop/hive/ql/util/UpgradeTool.java new file mode 100644 index 0000000000..c523a76659 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/util/UpgradeTool.java @@ -0,0 +1,677 @@ + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.util; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.hive.common.StatsSetupConst; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient; +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.api.EnvironmentContext; +import org.apache.hadoop.hive.metastore.api.InvalidOperationException; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.utils.FileUtils; +import org.apache.hadoop.hive.metastore.utils.FileUtils.RemoteIteratorWithFilter; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.BucketCodec; +import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hive.common.util.HiveVersionInfo; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; + + +/** + * A new type of transactional tables was added in 3.0 - insert-only tables. These + * tables support ACID semantics and work with any Input/OutputFormat. Any Managed tables may + * be made insert-only transactional table. These tables don't support Update/Delete/Merge commands. + * + * In postUpgrade mode, Hive 3.0 jars/hive-site.xml should be on the classpath. This utility will + * find all the tables that may be made transactional (with ful CRUD support) and generate + * Alter Table commands to do so. It will also find all tables that may do not support full CRUD + * but can be made insert-only transactional tables and generate corresponding Alter Table commands. + * + * Note that to convert a table to full CRUD table requires that all files follow a naming + * convention, namely 0000N_0 or 0000N_0_copy_M, N >= 0, M > 0. This utility can perform this + * rename with "execute" option. It will also produce a script (with and w/o "execute" to + * perform the renames). + * + * "execute" option may be supplied in both modes to have the utility automatically execute the + * equivalent of the generated commands + * + * "location" option may be supplied followed by a path to set the location for the generated + * scripts. + * + * See also org.apache.hadoop.hive.upgrade.acid.PreUpgradeTool for steps which may be necessary to + * perform before upgrading to Hive 3. + */ +public class UpgradeTool { + private static final Logger LOG = LoggerFactory.getLogger(UpgradeTool.class); + private static final int PARTITION_BATCH_SIZE = 10000; + private final Options cmdLineOptions = new Options(); + + public static void main(String[] args) throws Exception { + UpgradeTool tool = new UpgradeTool(); + tool.init(); + CommandLineParser parser = new GnuParser(); + CommandLine line ; + String outputDir = "."; + boolean execute = false; + try { + line = parser.parse(tool.cmdLineOptions, args); + } catch (ParseException e) { + System.err.println("UpgradeTool: Parsing failed. Reason: " + e.getLocalizedMessage()); + printAndExit(tool); + return; + } + if (line.hasOption("help")) { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("upgrade-acid", tool.cmdLineOptions); + return; + } + if(line.hasOption("location")) { + outputDir = line.getOptionValue("location"); + } + if(line.hasOption("execute")) { + execute = true; + } + LOG.info("Starting with execute=" + execute + ", location=" + outputDir); + + try { + String hiveVer = HiveVersionInfo.getShortVersion(); + LOG.info("Using Hive Version: " + HiveVersionInfo.getVersion() + " build: " + + HiveVersionInfo.getBuildVersion()); + if(!(hiveVer.startsWith("3.") || hiveVer.startsWith("4."))) { + throw new IllegalStateException("postUpgrade w/execute requires Hive 3.x. Actual: " + + hiveVer); + } + tool.performUpgradeInternal(outputDir, execute); + } + catch(Exception ex) { + LOG.error("UpgradeTool failed", ex); + throw ex; + } + } + private static void printAndExit(UpgradeTool tool) { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("upgrade-acid", tool.cmdLineOptions); + System.exit(1); + } + + private void init() { + try { + cmdLineOptions.addOption(new Option("help", "Generates a script to execute on 3.x " + + "cluster. This requires 3.x binaries on the classpath and hive-site.xml.")); + Option exec = new Option("execute", + "Executes commands equivalent to generated scrips"); + exec.setOptionalArg(true); + cmdLineOptions.addOption(exec); + cmdLineOptions.addOption(new Option("location", true, + "Location to write scripts to. Default is CWD.")); + } + catch(Exception ex) { + LOG.error("init()", ex); + throw ex; + } + } + private static IMetaStoreClient getHMS(HiveConf conf) { + UserGroupInformation loggedInUser = null; + try { + loggedInUser = UserGroupInformation.getLoginUser(); + } catch (IOException e) { + LOG.warn("Unable to get logged in user via UGI. err: {}", e.getMessage()); + } + boolean secureMode = loggedInUser != null && loggedInUser.hasKerberosCredentials(); + if (secureMode) { + MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.USE_THRIFT_SASL, true); + } + try { + LOG.info("Creating metastore client for {}", "PreUpgradeTool"); + return RetryingMetaStoreClient.getProxy(conf, true); + } catch (MetaException e) { + throw new RuntimeException("Error connecting to Hive Metastore URI: " + + conf.getVar(HiveConf.ConfVars.METASTOREURIS) + ". " + e.getMessage(), e); + } + } + /** + * todo: this should accept a file of table names to exclude from non-acid to acid conversion + * todo: change script comments to a preamble instead of a footer + */ + private void performUpgradeInternal(String scriptLocation, boolean execute) + throws HiveException, TException, IOException { + HiveConf conf = hiveConf != null ? hiveConf : new HiveConf(); + boolean isAcidEnabled = isAcidEnabled(conf); + IMetaStoreClient hms = getHMS(conf); + LOG.debug("Looking for databases"); + List databases = hms.getAllDatabases();//TException + LOG.debug("Found " + databases.size() + " databases to process"); + List convertToAcid = new ArrayList<>(); + List convertToMM = new ArrayList<>(); + Hive db = null; + if(execute) { + db = Hive.get(conf); + } + PrintWriter pw = makeRenameFileScript(scriptLocation); + + for(String dbName : databases) { + List tables = hms.getAllTables(dbName); + LOG.debug("found " + tables.size() + " tables in " + dbName); + for(String tableName : tables) { + Table t = hms.getTable(dbName, tableName); + LOG.debug("processing table " + Warehouse.getQualifiedName(t)); + if(isAcidEnabled) { + //if acid is off post upgrade, you can't make any tables acid - will throw + processConversion(t, convertToAcid, convertToMM, hms, db, execute, pw); + } + } + } + pw.close(); + makeConvertTableScript(convertToAcid, convertToMM, scriptLocation); + } + + /** + * Actually makes the table transactional + */ + private static void alterTable(Table t, Hive db, boolean isMM) + throws HiveException, InvalidOperationException { + org.apache.hadoop.hive.ql.metadata.Table metaTable = + //clone to make sure new prop doesn't leak + new org.apache.hadoop.hive.ql.metadata.Table(t.deepCopy()); + metaTable.getParameters().put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, "true"); + if(isMM) { + metaTable.getParameters() + .put(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES, "insert_only"); + } + EnvironmentContext ec = new EnvironmentContext(); + /*we are not modifying any data so stats should be exactly the same*/ + ec.putToProperties(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE); + db.alterTable(Warehouse.getQualifiedName(t), metaTable, false, ec, false); + } + + /** + * assumes https://issues.apache.org/jira/browse/HIVE-19750 is in + * How does this work with Storage Based Auth? + * @param p partition root or table root if not partitioned + */ + static void handleRenameFiles(Table t, Path p, boolean execute, Configuration conf, + boolean isBucketed, PrintWriter pw) throws IOException { + AcidUtils.BUCKET_DIGIT_PATTERN.matcher("foo"); + if (isBucketed) { + /* For bucketed tables we assume that Hive wrote them and 0000M_0 and 0000M_0_copy_8 + are the only possibilities. Since we can't move files across buckets the only thing we + can do is put 0000M_0_copy_N into delta_N_N as 0000M_0. + + If M > 4096 - should error out - better yet, make this table external one - can those + be bucketed? don't think so + */ + //Known deltas + Map> deltaToFileMap = new HashMap<>(); + FileSystem fs = FileSystem.get(conf); + RemoteIteratorWithFilter iter = + new RemoteIteratorWithFilter(fs.listFiles(p, true), RemoteIteratorWithFilter.HIDDEN_FILES_FULL_PATH_FILTER); + Function> makeList = new Function>() {//lambda? + @Override + public List apply(Integer aVoid) { + return new ArrayList<>(); + } + }; + while (iter.hasNext()) { + LocatedFileStatus lfs = iter.next(); + if (lfs.isDirectory()) { + String msg = Warehouse.getQualifiedName(t) + " is bucketed and has a subdirectory: " + + lfs.getPath(); + LOG.error(msg); + throw new IllegalStateException(msg); + } + AcidUtils.BucketMetaData bmd = AcidUtils.BucketMetaData.parse(lfs.getPath()); + if (bmd.bucketId < 0) { + //non-standard file name - don't know what bucket the rows belong to and we can't + //rename the file so tha it may end up treated like a different bucket id + String msg = "Bucketed table " + Warehouse.getQualifiedName(t) + " contains file " + + lfs.getPath() + " with non-standard name"; + LOG.error(msg); + throw new IllegalArgumentException(msg); + } else { + if (bmd.bucketId > BucketCodec.MAX_BUCKET_ID) { + String msg = "Bucketed table " + Warehouse.getQualifiedName(t) + " contains file " + + lfs.getPath() + " with bucketId=" + bmd.bucketId + " that is out of range"; + LOG.error(msg); + throw new IllegalArgumentException(msg); + } + if (bmd.copyNumber > 0) { + deltaToFileMap.computeIfAbsent(bmd.copyNumber, makeList).add(lfs.getPath()); + } + } + } + if(!deltaToFileMap.isEmpty()) { + println(pw, "#Begin file renames for bucketed table " + Warehouse.getQualifiedName(t)); + } + for (Map.Entry> ent : deltaToFileMap.entrySet()) { + /* create delta and move each files to it. HIVE-19750 ensures wer have reserved + * enough write IDs to do this.*/ + Path deltaDir = new Path(p, AcidUtils.deltaSubdir(ent.getKey(), ent.getKey())); + if (execute) { + if (!fs.mkdirs(deltaDir)) { + String msg = "Failed to create directory " + deltaDir; + LOG.error(msg); + throw new IllegalStateException(msg); + } + } + // Add to list of FS commands + makeDirectoryCommand(deltaDir, pw); + + for (Path file : ent.getValue()) { + Path newFile = new Path(deltaDir, stripCopySuffix(file.getName())); + LOG.debug("need to rename: " + file + " to " + newFile); + if (fs.exists(newFile)) { + String msg = Warehouse.getQualifiedName(t) + ": " + newFile + " already exists?!"; + LOG.error(msg); + throw new IllegalStateException(msg); + } + if (execute) { + if (!fs.rename(file, newFile)) { + String msg = Warehouse.getQualifiedName(t) + ": " + newFile + ": failed to rename"; + LOG.error(msg); + throw new IllegalStateException(msg); + } + } + //do this with and w/o execute to know what was done + makeRenameCommand(file, newFile, pw); + } + } + if(!deltaToFileMap.isEmpty()) { + println(pw, "#End file renames for bucketed table " + Warehouse.getQualifiedName(t)); + } + return; + } + List renames = new ArrayList<>(); + FileSystem fs = FileSystem.get(conf); + RemoteIteratorWithFilter iter = + new RemoteIteratorWithFilter(fs.listFiles(p, true), RemoteIteratorWithFilter.HIDDEN_FILES_FULL_PATH_FILTER); + /** + * count some heuristics - bad file is something not in {@link AcidUtils#ORIGINAL_PATTERN} or + * {@link AcidUtils#ORIGINAL_PATTERN_COPY} format. This has to be renamed for acid to work. + */ + int numBadFileNames = 0; + /** + * count some heuristics - num files in {@link AcidUtils#ORIGINAL_PATTERN_COPY} format. These + * are supported but if there are a lot of them there will be a perf hit on read until + * major compaction + */ + int numCopyNFiles = 0; + int fileId = 0;//ordinal of the file in the iterator + long numBytesInPartition = getDataSize(p, conf); + int numBuckets = guessNumBuckets(numBytesInPartition); + while (iter.hasNext()) { + LocatedFileStatus lfs = iter.next(); + if(lfs.isDirectory()) { + continue; + } + AcidUtils.BucketMetaData bmd = AcidUtils.BucketMetaData.parse(lfs.getPath()); + if(bmd.bucketId < 0) { + numBadFileNames++; + } + if(bmd.copyNumber > 0) { + //todo: what about same file name in subdir like Union All? ROW_ID generation will handle it + //but will have to look at ORC footers - treat these as copyN files? + numCopyNFiles++; + } + int wrtieId = fileId / numBuckets + 1;//start with delta_1 (not delta_0) + Path deltaDir = new Path(p, AcidUtils.deltaSubdir(wrtieId, wrtieId)); + if (execute) { + if (!fs.mkdirs(deltaDir)) { + String msg = "Failed to create directory " + deltaDir; + LOG.error(msg); + throw new IllegalStateException(msg); + } + } + // Add to list of FS commands + makeDirectoryCommand(deltaDir, pw); + + Path newPath = + new Path(deltaDir, String.format(AcidUtils.BUCKET_DIGITS, fileId % numBuckets)+ "_0"); + /*we could track reason for rename in RenamePair so that the decision can be made later to + rename or not. For example, if we need to minimize renames (say we are on S3), then we'd + only rename if it's absolutely required, i.e. if it's a 'bad file name'*/ + renames.add(new RenamePair(lfs.getPath(), newPath)); + fileId++; + } + if(numBadFileNames <= 0 && numCopyNFiles <=0) { + //if here, the only reason we'd want to rename is to spread the data into logical buckets to + //help 3.0 Compactor generated more balanced splits + return; + } + if(!renames.isEmpty()) { + println(pw, "#Begin file renames for unbucketed table " + Warehouse.getQualifiedName(t)); + } + for(RenamePair renamePair : renames) { + LOG.debug("need to rename: " + renamePair.getOldPath() + " to " + renamePair.getNewPath()); + if (fs.exists(renamePair.getNewPath())) { + String msg = Warehouse.getQualifiedName(t) + ": " + renamePair.getNewPath() + + " already exists?!"; + LOG.error(msg); + throw new IllegalStateException(msg); + } + if (execute) { + if (!fs.rename(renamePair.getOldPath(), renamePair.getNewPath())) { + String msg = Warehouse.getQualifiedName(t) + ": " + renamePair.getNewPath() + + ": failed to rename"; + LOG.error(msg); + throw new IllegalStateException(msg); + } + } + //do this with and w/o execute to know what was done + makeRenameCommand(renamePair.getOldPath(), renamePair.getNewPath(), pw); + } + if(!renames.isEmpty()) { + println(pw, "#End file renames for unbucketed table " + Warehouse.getQualifiedName(t)); + } + } + private static void makeRenameCommand(Path file, Path newFile, PrintWriter pw) { + //https://hadoop.apache.org/docs/r3.0.0-alpha2/hadoop-project-dist/hadoop-common/FileSystemShell.html#mv + println(pw, "hadoop fs -mv " + file + " " + newFile + ";"); + } + private static void makeDirectoryCommand(Path dir, PrintWriter pw) { + println(pw, "hadoop fs -mkdir " + dir + ";"); + } + + private static void println(PrintWriter pw, String msg) { + if (pw != null) { + pw.println(msg); + } + } + + /** + * Need better impl to be more memory efficient - there could be a lot of these objects. + * For example, remember partition root Path elsewhere, + * and have this object remember relative path to old file and bucketid/deletaid of new one + */ + private static final class RenamePair { + private Path oldPath; + private Path newPath; + private RenamePair(Path old, Path newPath) { + this.oldPath = old; + this.newPath = newPath; + } + private Path getOldPath() { + return oldPath; + } + private Path getNewPath() { + return newPath; + } + } + + /** + * @param location - path to a partition (or table if not partitioned) dir + */ + private static long getDataSize(Path location, Configuration conf) throws IOException { + FileSystem fs = location.getFileSystem(conf); + ContentSummary cs = fs.getContentSummary(location); + return cs.getLength(); + } + + /** + * @param fileName - matching {@link AcidUtils#ORIGINAL_PATTERN_COPY} + */ + private static String stripCopySuffix(String fileName) { + //0000_0_copy_N -> 0000_0 + return fileName.substring(0, fileName.indexOf('_', 1 + fileName.indexOf('_', 0))); + } + + /** + * Since current compactor derives its parallelism from file names, we need to name files in + * a way to control this parallelism. This should be a function of data size. + * @param partitionSizeInBytes + * @return cannot exceed 4096 + */ + public static int guessNumBuckets(long partitionSizeInBytes) { + long OneGB = 1000000000; + if(partitionSizeInBytes <= 1000000000) { + return 1;//1 bucket + } + if(partitionSizeInBytes <= 100 * OneGB) { + return 8; + } + if(partitionSizeInBytes <= 1000 * OneGB) {//TB + return 16; + } + if(partitionSizeInBytes <= 10 * 1000 * OneGB) {//10 TB + return 32; + } + if(partitionSizeInBytes <= 100 * 1000 * OneGB) {//100TB + return 64; + } + if(partitionSizeInBytes <= 1000 * 1000 * OneGB) {//PT + return 128; + } + if(partitionSizeInBytes <= 10 * 1000* 1000 * OneGB) {//10 PT + return 256; + } + if(partitionSizeInBytes <= 100 * 1000 * 1000 * OneGB) {//100 PT + return 512; + } + if(partitionSizeInBytes <= 1000 * 1000 *1000 * OneGB) {//1000 PT + return 1024; + } + return 2048; + } + /** + * todo: handle exclusion list + * Figures out which tables to make Acid, MM and (optionally, performs the operation) + */ + private static void processConversion(Table t, List convertToAcid, + List convertToMM, IMetaStoreClient hms, Hive db, boolean execute, PrintWriter pw) + throws TException, HiveException, IOException { + if(isFullAcidTable(t)) { + return; + } + if(!TableType.MANAGED_TABLE.name().equalsIgnoreCase(t.getTableType())) { + return; + } + //todo: are HBase, Druid talbes managed in 2.x? 3.0? + String fullTableName = Warehouse.getQualifiedName(t); + /* + * ORC uses table props for settings so things like bucketing, I/O Format, etc should + * be the same for each partition. + */ + boolean canBeMadeAcid = canBeMadeAcid(fullTableName, t.getSd()); + if(t.getPartitionKeysSize() <= 0) { + if(canBeMadeAcid) { + convertToAcid.add("ALTER TABLE " + Warehouse.getQualifiedName(t) + " SET TBLPROPERTIES (" + + "'transactional'='true')"); + //do this before alterTable in case files need to be renamed, else + // TransactionalMetastoreListerner will squak + handleRenameFiles(t, new Path(t.getSd().getLocation()), execute, db.getConf(), + t.getSd().getBucketColsSize() > 0, pw); + if(execute) { + alterTable(t, db, false); + } + } + else { + convertToMM.add("ALTER TABLE " + Warehouse.getQualifiedName(t) + " SET TBLPROPERTIES (" + + "'transactional'='true', 'transactional_properties'='insert_only')"); + if(execute) { + alterTable(t, db, true); + } + } + } + else { + if(!canBeMadeAcid) { + convertToMM.add("ALTER TABLE " + Warehouse.getQualifiedName(t) + " SET TBLPROPERTIES (" + + "'transactional'='true', 'transactional_properties'='insert_only')"); + if(execute) { + alterTable(t, db, true); + } + return; + } + //now that we know it can be made acid, rename files as needed + //process in batches in case there is a huge # of partitions + List partNames = hms.listPartitionNames(t.getDbName(), t.getTableName(), (short)-1); + int batchSize = PARTITION_BATCH_SIZE; + int numWholeBatches = partNames.size()/batchSize; + for(int i = 0; i < numWholeBatches; i++) { + List partitionList = hms.getPartitionsByNames(t.getDbName(), t.getTableName(), + partNames.subList(i * batchSize, (i + 1) * batchSize)); + for(Partition part : partitionList) { + handleRenameFiles(t, new Path(part.getSd().getLocation()), execute, db.getConf(), + t.getSd().getBucketColsSize() > 0, pw); + } + } + if(numWholeBatches * batchSize < partNames.size()) { + //last partial batch + List partitionList = hms.getPartitionsByNames(t.getDbName(), t.getTableName(), + partNames.subList(numWholeBatches * batchSize, partNames.size())); + for(Partition part : partitionList) { + handleRenameFiles(t, new Path(part.getSd().getLocation()), execute, db.getConf(), + t.getSd().getBucketColsSize() > 0, pw); + } + } + //if here, handled all parts and they are no wAcid compatible - make it acid + convertToAcid.add("ALTER TABLE " + Warehouse.getQualifiedName(t) + " SET TBLPROPERTIES (" + + "'transactional'='true')"); + if(execute) { + alterTable(t, db, false); + } + } + } + private static boolean canBeMadeAcid(String fullTableName, StorageDescriptor sd) { + return isAcidInputOutputFormat(fullTableName, sd) && sd.getSortColsSize() <= 0; + } + private static boolean isAcidInputOutputFormat(String fullTableName, StorageDescriptor sd) { + try { + Class inputFormatClass = sd.getInputFormat() == null ? null : + Class.forName(sd.getInputFormat()); + Class outputFormatClass = sd.getOutputFormat() == null ? null : + Class.forName(sd.getOutputFormat()); + + if (inputFormatClass != null && outputFormatClass != null && + Class.forName("org.apache.hadoop.hive.ql.io.AcidInputFormat") + .isAssignableFrom(inputFormatClass) && + Class.forName("org.apache.hadoop.hive.ql.io.AcidOutputFormat") + .isAssignableFrom(outputFormatClass)) { + return true; + } + } catch (ClassNotFoundException e) { + //if a table is using some custom I/O format and it's not in the classpath, we won't mark + //the table for Acid, but today (Hive 3.1 and earlier) OrcInput/OutputFormat is the only + //Acid format + LOG.error("Could not determine if " + fullTableName + + " can be made Acid due to: " + e.getMessage(), e); + return false; + } + return false; + } + private static void makeConvertTableScript(List alterTableAcid, List alterTableMm, + String scriptLocation) throws IOException { + if (alterTableAcid.isEmpty()) { + LOG.info("No acid conversion is necessary"); + } else { + String fileName = "convertToAcid_" + System.currentTimeMillis() + ".sql"; + LOG.debug("Writing CRUD conversion commands to " + fileName); + try(PrintWriter pw = createScript(alterTableAcid, fileName, scriptLocation)) { + pw.println("-- These commands may be executed by Hive 3.x later"); + } + } + + if (alterTableMm.isEmpty()) { + LOG.info("No managed table conversion is necessary"); + } else { + String fileName = "convertToMM_" + System.currentTimeMillis() + ".sql"; + LOG.debug("Writing managed table conversion commands to " + fileName); + try(PrintWriter pw = createScript(alterTableMm, fileName, scriptLocation)) { + pw.println("-- These commands must be executed by Hive 3.0 or later"); + } + } + } + + private static PrintWriter createScript(List commands, String fileName, + String scriptLocation) throws IOException { + FileWriter fw = new FileWriter(scriptLocation + "/" + fileName); + PrintWriter pw = new PrintWriter(fw); + for(String cmd : commands) { + pw.println(cmd + ";"); + } + return pw; + } + private static PrintWriter makeRenameFileScript(String scriptLocation) throws IOException { + String fileName = "normalizeFileNames_" + System.currentTimeMillis() + ".sh"; + LOG.info("Writing file renaming commands to " + fileName); + return createScript(Collections.emptyList(), fileName, scriptLocation); + } + private static boolean isFullAcidTable(Table t) { + if (t.getParametersSize() <= 0) { + //cannot be acid + return false; + } + String transacationalValue = t.getParameters() + .get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL); + if (transacationalValue != null && "true".equalsIgnoreCase(transacationalValue)) { + System.out.println("Found Acid table: " + Warehouse.getQualifiedName(t)); + return true; + } + return false; + } + private static boolean isAcidEnabled(HiveConf hiveConf) { + String txnMgr = hiveConf.getVar(HiveConf.ConfVars.HIVE_TXN_MANAGER); + boolean concurrency = hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY); + String dbTxnMgr = "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"; + return txnMgr.equals(dbTxnMgr) && concurrency; + } + /** + * can set it from tests to test when config needs something other than default values + * For example, that acid is enabled + */ + @VisibleForTesting + static HiveConf hiveConf = null; +} diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnAddPartition.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnAddPartition.java index 589e3b7693..65601e447e 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnAddPartition.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnAddPartition.java @@ -56,7 +56,7 @@ public TemporaryFolder folder = new TemporaryFolder(); @Override - String getTestDataDir() { + protected String getTestDataDir() { return TEST_DATA_DIR; } diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java index e9d9f9c06e..536281d555 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java @@ -91,7 +91,7 @@ + "-" + System.currentTimeMillis() ).getPath().replaceAll("\\\\", "/"); @Override - String getTestDataDir() { + protected String getTestDataDir() { return TEST_DATA_DIR; } diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java index 66a4a88a51..89ab043313 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java @@ -82,7 +82,7 @@ ).getPath().replaceAll("\\\\", "/"); protected static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse"; @Override - String getTestDataDir() { + protected String getTestDataDir() { return TEST_DATA_DIR; } diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java index 5b8ff153ae..a5bd1cbd67 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java @@ -45,7 +45,7 @@ public TemporaryFolder folder = new TemporaryFolder(); @Override - String getTestDataDir() { + protected String getTestDataDir() { return TEST_DATA_DIR; } diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java index ad4ed76901..333126b6a5 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java @@ -39,7 +39,7 @@ ).getPath().replaceAll("\\\\", "/"); @Override - String getTestDataDir() { + protected String getTestDataDir() { return TEST_DATA_DIR; } diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java index 45f9e522f2..230e93e814 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java @@ -47,7 +47,7 @@ public TemporaryFolder folder= new TemporaryFolder(); @Override - String getTestDataDir() { + protected String getTestDataDir() { return TEST_DATA_DIR; } diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java index 2c98e3c906..948243609f 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java @@ -54,7 +54,7 @@ @Rule public TestName testName = new TestName(); @Override - String getTestDataDir() { + protected String getTestDataDir() { return TEST_DATA_DIR; } diff --git ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java index a2fafca9a1..05ce3e214d 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java +++ ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java @@ -49,7 +49,7 @@ final static int BUCKET_COUNT = 2; @Rule public TestName testName = new TestName(); - HiveConf hiveConf; + protected HiveConf hiveConf; Driver d; enum Table { ACIDTBL("acidTbl"), @@ -138,10 +138,10 @@ public void tearDown() throws Exception { FileUtils.deleteDirectory(new File(getTestDataDir())); } } - String getWarehouseDir() { + protected String getWarehouseDir() { return getTestDataDir() + "/warehouse"; } - abstract String getTestDataDir(); + protected abstract String getTestDataDir(); /** * takes raw data and turns it into a string as if from Driver.getResults() * sorts rows in dictionary order @@ -149,7 +149,7 @@ String getWarehouseDir() { List stringifyValues(int[][] rowsIn) { return TestTxnCommands2.stringifyValues(rowsIn); } - String makeValuesClause(int[][] rows) { + protected String makeValuesClause(int[][] rows) { return TestTxnCommands2.makeValuesClause(rows); } @@ -161,7 +161,7 @@ void runCleaner(HiveConf hiveConf) throws MetaException { TestTxnCommands2.runCleaner(hiveConf); } - List runStatementOnDriver(String stmt) throws Exception { + protected List runStatementOnDriver(String stmt) throws Exception { LOG.info("Running the query: " + stmt); CommandProcessorResponse cpr = d.run(stmt); if(cpr.getResponseCode() != 0) { @@ -244,7 +244,7 @@ void logResult(Logger LOG, List rs) { * which will currently make the query non-vectorizable. This means we can't check the file name * for vectorized version of the test. */ - void checkResult(String[][] expectedResult, String query, boolean isVectorized, String msg, Logger LOG) throws Exception{ + protected void checkResult(String[][] expectedResult, String query, boolean isVectorized, String msg, Logger LOG) throws Exception{ List rs = runStatementOnDriver(query); checkExpected(rs, expectedResult, msg + (isVectorized ? " vect" : ""), LOG, !isVectorized); assertVectorized(isVectorized, query); diff --git ql/src/test/org/apache/hadoop/hive/ql/util/TestUpgradeTool.java ql/src/test/org/apache/hadoop/hive/ql/util/TestUpgradeTool.java new file mode 100644 index 0000000000..fa6f1ca7d9 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/util/TestUpgradeTool.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.util; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.TxnCommandsBaseForTests; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; + +public class TestUpgradeTool extends TxnCommandsBaseForTests { + private static final Logger LOG = LoggerFactory.getLogger(TestUpgradeTool.class); + private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") + + File.separator + TestUpgradeTool.class.getCanonicalName() + "-" + System.currentTimeMillis() + ).getPath().replaceAll("\\\\", "/"); + @Override + protected String getTestDataDir() { + return TEST_DATA_DIR; + } + + /** + * includes 'execute' for postUpgrade + */ + @Test + public void testPostUpgrade() throws Exception { + int[][] data = {{1, 2}, {3, 4}, {5, 6}}; + int[][] dataPart = {{1, 2, 10}, {3, 4, 11}, {5, 6, 12}}; + hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "dynamic"); + runStatementOnDriver("drop table if exists TAcid"); + runStatementOnDriver("drop table if exists TAcidPart"); + runStatementOnDriver("drop table if exists TFlat"); + runStatementOnDriver("drop table if exists TFlatText"); + + //should be converted to Acid + runStatementOnDriver("create table TAcid (a int, b int) clustered by (b) into 2 buckets" + + " stored as orc TBLPROPERTIES ('transactional'='false')"); + runStatementOnDriver("insert into TAcid" + makeValuesClause(data)); + runStatementOnDriver("insert into TAcid" + makeValuesClause(data));//should now be copy_1 + runStatementOnDriver("insert into TAcid" + makeValuesClause(data));//should now be copy_2 + + //should be converted to Acid + runStatementOnDriver("create table TAcidPart (a int, b int) partitioned by (p int)" + + " clustered by (b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='false')"); + //to create some partitions + runStatementOnDriver("insert into TAcidPart partition(p)" + makeValuesClause(dataPart)); + //and copy_1 files + runStatementOnDriver("insert into TAcidPart partition(p)" + makeValuesClause(dataPart)); + + //should be converted to Acid + //todo add some files with non-standard names + runStatementOnDriver("create table TFlat (a int, b int) stored as orc " + + "tblproperties('transactional'='false')"); + runStatementOnDriver("insert into TFlat values(1,2)");//create 0000_0 + runStatementOnDriver("insert into TFlat values(2,3)");//create 0000_0_copy_1 + runStatementOnDriver("insert into TFlat values(3,4)");//create 0000_0_copy_2 + runStatementOnDriver("insert into TFlat values(4,5)");//create 0000_0_copy_3 + runStatementOnDriver("insert into TFlat values(5,6)");//create 0000_0_copy_4 + runStatementOnDriver("insert into TFlat values(6,7)");//create 0000_0_copy_5 + + + /* + ├── 000000_0 + ├── 000000_0_copy_1 + ├── 000000_0_copy_2 + ├── 000000_0_copy_3 + └── 000000_0_copy_4 + └── 000000_0_copy_5 + + to + + ├── 000000_0 + ├── 000000_0_copy_2 + ├── 1 + │   └── 000000_0 + ├── 2 + │   └── 000000_0 + └── subdir + | └── part-0001 + |--.hive-staging_hive_2018-07-04_11-12-18_760_5286422535984490754-1395/000000_0 + +*/ + FileSystem fs = FileSystem.get(hiveConf); + //simulate Spark (non-Hive) write + fs.rename(new Path(getWarehouseDir() + "/tflat/000000_0_copy_1"), + new Path(getWarehouseDir() + "/tflat/subdir/part-0001")); + //simulate Insert ... Select ... Union All... + fs.rename(new Path(getWarehouseDir() + "/tflat/000000_0_copy_3"), + new Path(getWarehouseDir() + "/tflat/1/000000_0")); + fs.rename(new Path(getWarehouseDir() + "/tflat/000000_0_copy_4"), + new Path(getWarehouseDir() + "/tflat/2/000000_0")); + fs.rename(new Path(getWarehouseDir() + "/tflat/000000_0_copy_5"), + new Path(getWarehouseDir() + "/tflat/.hive-staging_hive_2018-07-04_11-12-18_760_5286422535984490754-1395/000000_0")); + + String testQuery0 = "select a, b from TFlat order by a"; + String[][] expected0 = new String[][] { + {"1\t2",""}, + {"2\t3",""}, + {"3\t4",""}, + {"4\t5",""}, + {"5\t6",""}, + }; + checkResult(expected0, testQuery0, true, "TFlat pre-check", LOG); + + + //should be converted to MM + runStatementOnDriver("create table TFlatText (a int, b int) stored as textfile " + + "tblproperties('transactional'='false')"); + + Hive db = Hive.get(hiveConf); + org.apache.hadoop.hive.ql.metadata.Table tacid = db.getTable("default", "tacid"); + Assert.assertEquals("Expected TAcid to not be full acid", false, + AcidUtils.isFullAcidTable(tacid)); + org.apache.hadoop.hive.ql.metadata.Table tacidpart = db.getTable("default", "tacidpart"); + Assert.assertEquals("Expected TAcidPart to not be full acid", false, + AcidUtils.isFullAcidTable(tacidpart)); + + org.apache.hadoop.hive.ql.metadata.Table t = db.getTable("default", "tflat"); + Assert.assertEquals("Expected TAcid to not be full acid", false, + AcidUtils.isFullAcidTable(t)); + t = db.getTable("default", "tflattext"); + Assert.assertEquals("Expected TAcidPart to not be full acid", false, + AcidUtils.isInsertOnlyTable(tacidpart)); + + + String[] args2 = {"-location", getTestDataDir(), "-execute"}; + UpgradeTool.hiveConf = hiveConf; + UpgradeTool.main(args2); + + tacid = db.getTable("default", "tacid"); + Assert.assertEquals("Expected TAcid to become full acid", true, + AcidUtils.isFullAcidTable(tacid)); + tacidpart = db.getTable("default", "tacidpart"); + Assert.assertEquals("Expected TAcidPart to become full acid", true, + AcidUtils.isFullAcidTable(tacidpart)); + + t = db.getTable("default", "tflat"); + Assert.assertEquals("Expected TAcid to become acid", true, AcidUtils.isFullAcidTable(t)); + t = db.getTable("default", "tflattext"); + Assert.assertEquals("Expected TAcidPart to become MM", true, + AcidUtils.isInsertOnlyTable(t)); + + /*make sure we still get the same data and row_ids are assigned and deltas are as expected: + * each set of copy_N goes into matching delta_N_N.*/ + String testQuery = "select ROW__ID, a, b, INPUT__FILE__NAME from TAcid order by a, b, ROW__ID"; + String[][] expected = new String[][] { + {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":1}\t1\t2", + "tacid/000000_0"}, + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t1\t2", + "tacid/delta_0000001_0000001/000000_0"}, + {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t1\t2", + "tacid/delta_0000002_0000002/000000_0"}, + {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":0}\t3\t4", + "tacid/000001_0"}, + {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t3\t4", + "tacid/delta_0000001_0000001/000001_0"}, + {"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t3\t4", + "tacid/delta_0000002_0000002/000001_0"}, + {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":0}\t5\t6", + "tacid/000000_0"}, + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t5\t6", + "tacid/delta_0000001_0000001/000000_0"}, + {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t5\t6", + "tacid/delta_0000002_0000002/000000_0"} + }; + checkResult(expected, testQuery, false, "TAcid post-check", LOG); + + + testQuery = "select ROW__ID, a, b, INPUT__FILE__NAME from TAcidPart order by a, b, p, ROW__ID"; + String[][] expected2 = new String[][] { + {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", + "warehouse/tacidpart/p=10/000000_0"}, + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", + "tacidpart/p=10/delta_0000001_0000001/000000_0"}, + {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":0}\t3\t4", + "tacidpart/p=11/000001_0"}, + {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t3\t4", + "tacidpart/p=11/delta_0000001_0000001/000001_0"}, + {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":0}\t5\t6", + "tacidpart/p=12/000000_0"}, + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t5\t6", + "tacidpart/p=12/delta_0000001_0000001/000000_0"} + }; + checkResult(expected2, testQuery, false, "TAcidPart post-check", LOG); + + /* Verify that we re-arranged/renamed so that files names follow hive naming convention + and are spread among deltas/buckets + The order of files in RemoteIterator iter = fs.listFiles(p, true) + is what determines which delta/file any original file ends up in + + The test is split into 2 parts to test data and metadata because RemoteIterator walks in + different order on different machines*/ + + testQuery = "select a, b from TFlat order by a"; + String[][] expectedData = new String[][] { + {"1\t2"}, + {"2\t3"}, + {"3\t4"}, + {"4\t5"}, + {"5\t6"} + }; + checkResult(expectedData, testQuery, true, "TFlat post-check data", LOG); + + testQuery = "select ROW__ID, INPUT__FILE__NAME from TFlat order by INPUT__FILE__NAME"; + String[][] expectedMetaData = new String[][] { + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}", + "tflat/delta_0000001_0000001/00000_0"}, + {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}", + "tflat/delta_0000002_0000002/00000_0"}, + {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}", + "tflat/delta_0000003_0000003/00000_0"}, + {"{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}", + "tflat/delta_0000004_0000004/00000_0"}, + {"{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}", + "tflat/delta_0000005_0000005/00000_0"} + }; + checkResult(expectedMetaData, testQuery, false, "TFlat post-check files", LOG); + } + @Test + public void testGuessNumBuckets() { + Assert.assertEquals(1, UpgradeTool.guessNumBuckets(123)); + Assert.assertEquals(1, UpgradeTool.guessNumBuckets(30393930)); + Assert.assertEquals(1, UpgradeTool.guessNumBuckets((long) Math.pow(10, 9))); + Assert.assertEquals(32, UpgradeTool.guessNumBuckets((long) Math.pow(10, 13)));//10 TB + Assert.assertEquals(128, UpgradeTool.guessNumBuckets((long) Math.pow(10, 15)));//PB + } +} diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/FileUtils.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/FileUtils.java index 154db4b17d..155ecb18bf 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/FileUtils.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/FileUtils.java @@ -23,8 +23,10 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.Trash; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; @@ -38,6 +40,7 @@ import java.util.BitSet; import java.util.Collections; import java.util.List; +import java.util.NoSuchElementException; public class FileUtils { private static final PathFilter SNAPSHOT_DIR_PATH_FILTER = new PathFilter() { @@ -534,4 +537,61 @@ public static Path getTransformedPath(String name, String subDir, String root) { } return null; } + public static class RemoteIteratorWithFilter implements RemoteIterator { + /** + * This works with {@link RemoteIterator} which (potentially) produces all files recursively + * so looking for hidden folders must look at whole path, not just the the last part of it as + * would be appropriate w/o recursive listing. + */ + public static final PathFilter HIDDEN_FILES_FULL_PATH_FILTER = new PathFilter() { + @Override + public boolean accept(Path p) { + do { + String name = p.getName(); + if (name.startsWith("_") || name.startsWith(".")) { + return false; + } + } while ((p = p.getParent()) != null); + return true; + } + }; + private final RemoteIterator iter; + private final PathFilter filter; + private LocatedFileStatus nextFile; + + public RemoteIteratorWithFilter(RemoteIterator iter, PathFilter filter) + throws IOException { + this.iter = iter; + this.filter = filter; + findNext(); + } + + @Override + public boolean hasNext() throws IOException { + return nextFile != null; + } + + @Override + public LocatedFileStatus next() throws IOException { + if (!hasNext()) { + throw new NoSuchElementException(); + } + LocatedFileStatus result = nextFile; + findNext(); + return result; + } + + void findNext() throws IOException { + while (iter.hasNext()) { + LocatedFileStatus status = iter.next(); + if (filter.accept(status.getPath())) { + nextFile = status; + return; + } + } + + // No more matching files in the iterator + nextFile = null; + } + } } diff --git upgrade-acid/pom.xml upgrade-acid/pom.xml index a0325488ac..a1dcc60e5f 100644 --- upgrade-acid/pom.xml +++ upgrade-acid/pom.xml @@ -17,7 +17,10 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - + org.apache apache 18 @@ -29,7 +32,7 @@ 4.0.0-SNAPSHOT hive-upgrade-acid Hive Upgrade Acid - jar + pom @@ -47,250 +50,9 @@ ${basedir}/checkstyle/ 2.17 2.20.1 - - - ${project.build.directory}/testconf - file:// - ${project.basedir}/src/test/resources - ${project.build.directory}/tmp - ${project.build.directory}/warehouse - file:// - 1 - true - - - - commons-cli - commons-cli - 1.2 - provided - - - org.apache.hive - hive-metastore - 2.3.3 - provided - - - org.apache.hive - hive-exec - 2.3.3 - provided - - - org.apache.hadoop - hadoop-common - 2.7.2 - provided - - - - org.apache.hadoop - hadoop-mapreduce-client-common - 2.7.2 - provided - - - org.apache.orc - orc-core - 1.3.3 - provided - - - - - - - - ${basedir}/src/main/resources - - package.jdo - - - + + pre-upgrade + - - - - org.apache.maven.plugins - maven-antrun-plugin - ${maven.antrun.plugin.version} - - - ant-contrib - ant-contrib - ${ant.contrib.version} - - - ant - ant - - - - - - - org.apache.maven.plugins - maven-checkstyle-plugin - ${maven.checkstyle.plugin.version} - - - org.codehaus.mojo - exec-maven-plugin - ${maven.exec.plugin.version} - - - - - - - org.apache.maven.plugins - maven-antrun-plugin - - - setup-test-dirs - process-test-resources - - run - - - - - - - - - - - - - - - - - - setup-metastore-scripts - process-test-resources - - run - - - - - - - - - - - - - - - - - - org.apache.maven.plugins - maven-failsafe-plugin - 2.20.1 - - - - integration-test - verify - - - - - true - false - -Xmx2048m - false - - true - ${test.tmp.dir} - ${test.tmp.dir} - true - - - ${log4j.conf.dir} - - ${skipITests} - - - - org.apache.maven.plugins - maven-surefire-plugin - ${maven.surefire.version} - - true - false - ${test.forkcount} - -Xmx2048m - false - - ${project.build.directory} - true - ${derby.version} - ${test.tmp.dir}/derby.log - - ${test.log4j.scheme}${test.conf.dir}/hive-log4j2.properties - true - ${test.tmp.dir} - - jdbc:derby:${test.tmp.dir}/junit_metastore_db;create=true - false - ${test.tmp.dir} - ${test.warehouse.scheme}${test.warehouse.dir} - - - - ${log4j.conf.dir} - ${test.conf.dir} - - ${test.conf.dir}/conf - - - - - org.apache.maven.plugins - maven-jar-plugin - - - - test-jar - - - - - - \ No newline at end of file diff --git upgrade-acid/pre-upgrade/pom.xml upgrade-acid/pre-upgrade/pom.xml new file mode 100644 index 0000000000..333b960e48 --- /dev/null +++ upgrade-acid/pre-upgrade/pom.xml @@ -0,0 +1,283 @@ + + + + + + org.apache.hive + hive-upgrade-acid + 4.0.0-SNAPSHOT + ../pom.xml + + + + 4.0.0 + + hive-pre-upgrade + Hive Pre Upgrade Acid + jar + + + ../.. + + + ${project.build.directory}/testconf + file:// + ${project.basedir}/src/test/resources + ${project.build.directory}/tmp + ${project.build.directory}/warehouse + file:// + 1 + true + 2.3.3 + 2.7.2 + + + + + commons-cli + commons-cli + 1.2 + provided + + + org.apache.hive + hive-metastore + ${hdp.hive.version} + provided + + + org.apache.hive + hive-exec + ${hdp.hive.version} + provided + + + org.apache.hadoop + hadoop-common + ${hdp.hadoop.version} + provided + + + + org.apache.hadoop + hadoop-mapreduce-client-common + 2.7.2 + provided + + + org.apache.orc + orc-core + 1.3.3 + provided + + + + + + + + ${basedir}/src/main/resources + + package.jdo + + + + + + + + org.apache.maven.plugins + maven-antrun-plugin + ${maven.antrun.plugin.version} + + + ant-contrib + ant-contrib + ${ant.contrib.version} + + + ant + ant + + + + + + + org.apache.maven.plugins + maven-checkstyle-plugin + ${maven.checkstyle.plugin.version} + + + org.codehaus.mojo + exec-maven-plugin + ${maven.exec.plugin.version} + + + + + + + org.apache.maven.plugins + maven-antrun-plugin + + + setup-test-dirs + process-test-resources + + run + + + + + + + + + + + + + + + + + + setup-metastore-scripts + process-test-resources + + run + + + + + + + + + + + + + + + + + + org.apache.maven.plugins + maven-failsafe-plugin + 2.20.1 + + + + integration-test + verify + + + + + true + false + -Xmx2048m + false + + true + ${test.tmp.dir} + ${test.tmp.dir} + true + + + ${log4j.conf.dir} + + ${skipITests} + + + + org.apache.maven.plugins + maven-surefire-plugin + ${maven.surefire.version} + + true + false + ${test.forkcount} + -Xmx2048m + false + + ${project.build.directory} + true + ${derby.version} + ${test.tmp.dir}/derby.log + + ${test.log4j.scheme}${test.conf.dir}/hive-log4j2.properties + true + ${test.tmp.dir} + + jdbc:derby:${test.tmp.dir}/junit_metastore_db;create=true + false + ${test.tmp.dir} + ${test.warehouse.scheme}${test.warehouse.dir} + + + + ${log4j.conf.dir} + ${test.conf.dir} + + ${test.conf.dir}/conf + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + test-jar + + + + + + + \ No newline at end of file diff --git upgrade-acid/src/main/java/org/apache/hadoop/hive/upgrade/acid/UpgradeTool.java upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/PreUpgradeTool.java similarity index 67% rename from upgrade-acid/src/main/java/org/apache/hadoop/hive/upgrade/acid/UpgradeTool.java rename to upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/PreUpgradeTool.java index 78c084392d..2547f25932 100644 --- upgrade-acid/src/main/java/org/apache/hadoop/hive/upgrade/acid/UpgradeTool.java +++ upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/PreUpgradeTool.java @@ -32,16 +32,17 @@ import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaHook; +import org.apache.hadoop.hive.metastore.HiveMetaHookLoader; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; -import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.CompactionResponse; -import org.apache.hadoop.hive.metastore.api.InvalidOperationException; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; -import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.txn.TxnStore; @@ -51,6 +52,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.shims.HadoopShims; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hive.common.util.HiveVersionInfo; import org.apache.orc.OrcFile; import org.apache.orc.Reader; @@ -72,21 +74,16 @@ import static org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.escapeSQLString; /** - * This utility is designed to help with upgrading to Hive 3.0. On-disk layout for transactional - * tables has changed in 3.0 and require pre-processing before upgrade to ensure they are readable - * by Hive 3.0. Some transactional tables (identified by this utility) require Major compaction - * to be run on them before upgrading to 3.0. Once this compaction starts, no more - * update/delete/merge statements may be executed on these tables until upgrade is finished. + * This utility is designed to help with upgrading Hive 2.x to Hive 3.0. On-disk layout for + * transactional tables has changed in 3.0 and require pre-processing before upgrade to ensure + * they are readable by Hive 3.0. Some transactional tables (identified by this utility) require + * Major compaction to be run on them before upgrading to 3.0. Once this compaction starts, no + * more update/delete/merge statements may be executed on these tables until upgrade is finished. * * Additionally, a new type of transactional tables was added in 3.0 - insert-only tables. These * tables support ACID semantics and work with any Input/OutputFormat. Any Managed tables may * be made insert-only transactional table. These tables don't support Update/Delete/Merge commands. * - * This utility works in 2 modes: preUpgrade and postUpgrade. - * In preUpgrade mode it has to have 2.x Hive jars on the classpath. It will perform analysis on - * existing transactional tables, determine which require compaction and generate a set of SQL - * commands to launch all of these compactions. - * * Note that depending on the number of tables/partitions and amount of data in them compactions * may take a significant amount of time and resources. The script output by this utility includes * some heuristics that may help estimate the time required. If no script is produced, no action @@ -96,35 +93,37 @@ * hive.compactor.job.queue may be used to set a Yarn queue ame where all compaction jobs will be * submitted. * - * In postUpgrade mode, Hive 3.0 jars/hive-site.xml should be on the classpath. This utility will - * find all the tables that may be made transactional (with ful CRUD support) and generate - * Alter Table commands to do so. It will also find all tables that may not support full CRUD - * but can be made insert-only transactional tables and generate corresponding Alter Table commands. - * - * TODO: rename files - * - * "execute" option may be supplied in both modes to have the utility automatically execute the + * "execute" option may be supplied to have the utility automatically execute the * equivalent of the generated commands * * "location" option may be supplied followed by a path to set the location for the generated * scripts. + * + * Random: + * This utility connects to the Metastore via API. It may be necessary to set + * -Djavax.security.auth.useSubjectCredsOnly=false in Kerberized environment if errors like + * "org.ietf.jgss.GSSException: No valid credentials provided ( + * Mechanism level: Failed to find any Kerberos tgt)" + * show up after kinit. + * + * See also org.apache.hadoop.hive.ql.util.UpgradeTool in Hive 3.x */ -public class UpgradeTool { - private static final Logger LOG = LoggerFactory.getLogger(UpgradeTool.class); +public class PreUpgradeTool { + private static final Logger LOG = LoggerFactory.getLogger(PreUpgradeTool.class); private static final int PARTITION_BATCH_SIZE = 10000; private final Options cmdLineOptions = new Options(); public static void main(String[] args) throws Exception { - UpgradeTool tool = new UpgradeTool(); + PreUpgradeTool tool = new PreUpgradeTool(); tool.init(); CommandLineParser parser = new GnuParser(); CommandLine line ; String outputDir = "."; - boolean preUpgrade = false, postUpgrade = false, execute = false, nonBlocking = false; + boolean execute = false; try { line = parser.parse(tool.cmdLineOptions, args); } catch (ParseException e) { - System.err.println("UpgradeTool: Parsing failed. Reason: " + e.getLocalizedMessage()); + System.err.println("PreUpgradeTool: Parsing failed. Reason: " + e.getLocalizedMessage()); printAndExit(tool); return; } @@ -139,39 +138,23 @@ public static void main(String[] args) throws Exception { if(line.hasOption("execute")) { execute = true; } - if(line.hasOption("preUpgrade")) { - preUpgrade = true; - } - if(line.hasOption("postUpgrade")) { - postUpgrade = true; - } - LOG.info("Starting with preUpgrade=" + preUpgrade + ", postUpgrade=" + postUpgrade + - ", execute=" + execute + ", location=" + outputDir); - if(preUpgrade && postUpgrade) { - throw new IllegalArgumentException("Cannot specify both preUpgrade and postUpgrade"); - } + LOG.info("Starting with execute=" + execute + ", location=" + outputDir); try { String hiveVer = HiveVersionInfo.getShortVersion(); - if(preUpgrade) { - if(!hiveVer.startsWith("2.")) { - throw new IllegalStateException("preUpgrade requires Hive 2.x. Actual: " + hiveVer); - } - } - if(postUpgrade && execute && !isTestMode) { - if(!hiveVer.startsWith("3.")) { - throw new IllegalStateException("postUpgrade w/execute requires Hive 3.x. Actual: " + - hiveVer); - } + LOG.info("Using Hive Version: " + HiveVersionInfo.getVersion() + " build: " + + HiveVersionInfo.getBuildVersion()); + if(!hiveVer.startsWith("2.")) { + throw new IllegalStateException("preUpgrade requires Hive 2.x. Actual: " + hiveVer); } - tool.prepareAcidUpgradeInternal(outputDir, preUpgrade, postUpgrade, execute); + tool.prepareAcidUpgradeInternal(outputDir, execute); } catch(Exception ex) { - LOG.error("UpgradeTool failed", ex); + LOG.error("PreUpgradeTool failed", ex); throw ex; } } - private static void printAndExit(UpgradeTool tool) { + private static void printAndExit(PreUpgradeTool tool) { HelpFormatter formatter = new HelpFormatter(); formatter.printHelp("upgrade-acid", tool.cmdLineOptions); System.exit(1); @@ -179,13 +162,8 @@ private static void printAndExit(UpgradeTool tool) { private void init() { try { - cmdLineOptions.addOption(new Option("help", "print this message")); - cmdLineOptions.addOption(new Option("preUpgrade", - "Generates a script to execute on 2.x cluster. This requires 2.x binaries" + - " on the classpath and hive-site.xml.")); - cmdLineOptions.addOption(new Option("postUpgrade", - "Generates a script to execute on 3.x cluster. This requires 3.x binaries" + - " on the classpath and hive-site.xml.")); + cmdLineOptions.addOption(new Option("help", "Generates a script to execute on 2.x" + + " cluster. This requires 2.x binaries on the classpath and hive-site.xml.")); Option exec = new Option("execute", "Executes commands equivalent to generated scrips"); exec.setOptionalArg(true); @@ -198,27 +176,54 @@ private void init() { throw ex; } } + private static HiveMetaHookLoader getHookLoader() { + return new HiveMetaHookLoader() { + @Override + public HiveMetaHook getHook( + org.apache.hadoop.hive.metastore.api.Table tbl) { + return null; + } + }; + } + + private static IMetaStoreClient getHMS(HiveConf conf) { + UserGroupInformation loggedInUser = null; + try { + loggedInUser = UserGroupInformation.getLoginUser(); + } catch (IOException e) { + LOG.warn("Unable to get logged in user via UGI. err: {}", e.getMessage()); + } + boolean secureMode = loggedInUser != null && loggedInUser.hasKerberosCredentials(); + if (secureMode) { + conf.setBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL, true); + } + try { + LOG.info("Creating metastore client for {}", "PreUpgradeTool"); + /* I'd rather call return RetryingMetaStoreClient.getProxy(conf, true) + which calls HiveMetaStoreClient(HiveConf, Boolean) which exists in + (at least) 2.1.0.2.6.5.0-292 and later but not in 2.1.0.2.6.0.3-8 (the HDP 2.6 release) + i.e. RetryingMetaStoreClient.getProxy(conf, true) is broken in 2.6.0*/ + return RetryingMetaStoreClient.getProxy(conf, + new Class[]{HiveConf.class, HiveMetaHookLoader.class, Boolean.class}, + new Object[]{conf, getHookLoader(), Boolean.TRUE}, null, HiveMetaStoreClient.class.getName()); + } catch (MetaException e) { + throw new RuntimeException("Error connecting to Hive Metastore URI: " + + conf.getVar(HiveConf.ConfVars.METASTOREURIS) + ". " + e.getMessage(), e); + } + } + /** - * todo: this should accept a file of table names to exclude from non-acid to acid conversion * todo: change script comments to a preamble instead of a footer - * - * how does rename script work? "hadoop fs -mv oldname newname" * and what what about S3? - * How does this actually get executed? - * all other actions are done via embedded JDBC - * - * */ - private void prepareAcidUpgradeInternal(String scriptLocation, boolean preUpgrade, - boolean postUpgrade, boolean execute) throws HiveException, TException, IOException { + private void prepareAcidUpgradeInternal(String scriptLocation, boolean execute) + throws HiveException, TException, IOException { HiveConf conf = hiveConf != null ? hiveConf : new HiveConf(); boolean isAcidEnabled = isAcidEnabled(conf); - HiveMetaStoreClient hms = new HiveMetaStoreClient(conf);//MetaException + IMetaStoreClient hms = getHMS(conf); LOG.debug("Looking for databases"); List databases = hms.getAllDatabases();//TException LOG.debug("Found " + databases.size() + " databases to process"); List compactions = new ArrayList<>(); - List convertToAcid = new ArrayList<>(); - List convertToMM = new ArrayList<>(); final CompactionMetaInfo compactionMetaInfo = new CompactionMetaInfo(); ValidTxnList txns = null; Hive db = null; @@ -232,9 +237,9 @@ private void prepareAcidUpgradeInternal(String scriptLocation, boolean preUpgrad for(String tableName : tables) { Table t = hms.getTable(dbName, tableName); LOG.debug("processing table " + Warehouse.getQualifiedName(t)); - if(preUpgrade && isAcidEnabled) { + if(isAcidEnabled) { //if acid is off, there can't be any acid tables - nothing to compact - if(execute && txns == null) { + if(txns == null) { /* This API changed from 2.x to 3.0. so this won't even compile with 3.0 but it doesn't need to since we only run this preUpgrade @@ -246,18 +251,12 @@ private void prepareAcidUpgradeInternal(String scriptLocation, boolean preUpgrad getCompactionCommands(t, conf, hms, compactionMetaInfo, execute, db, txns); compactions.addAll(compactionCommands); } - if(postUpgrade && isAcidEnabled) { - //if acid is off post upgrade, you can't make any tables acid - will throw - processConversion(t, convertToAcid, convertToMM, hms, db, execute); - } /*todo: handle renaming files somewhere*/ } } makeCompactionScript(compactions, scriptLocation, compactionMetaInfo); - makeConvertTableScript(convertToAcid, convertToMM, scriptLocation); - makeRenameFileScript(scriptLocation);//todo: is this pre or post upgrade? - //todo: can different tables be in different FileSystems? - if(preUpgrade && execute) { + + if(execute) { while(compactionMetaInfo.compactionIds.size() > 0) { LOG.debug("Will wait for " + compactionMetaInfo.compactionIds.size() + " compactions to complete"); @@ -305,129 +304,7 @@ private void prepareAcidUpgradeInternal(String scriptLocation, boolean preUpgrad } } - /** - * Actualy makes the table transactional - */ - private static void alterTable(Table t, Hive db, boolean isMM) - throws HiveException, InvalidOperationException { - org.apache.hadoop.hive.ql.metadata.Table metaTable = - //clone to make sure new prop doesn't leak - new org.apache.hadoop.hive.ql.metadata.Table(t.deepCopy()); - metaTable.getParameters().put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, "true"); - if(isMM) { - metaTable.getParameters() - .put(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES, "insert_only"); - } - db.alterTable(Warehouse.getQualifiedName(t), metaTable, false, null); - } - - /** - * todo: handle exclusion list - * Figures out which tables to make Acid, MM and (optionally, performs the operation) - */ - private static void processConversion(Table t, List convertToAcid, - List convertToMM, HiveMetaStoreClient hms, Hive db, boolean execute) - throws TException, HiveException { - if(isFullAcidTable(t)) { - return; - } - if(!TableType.MANAGED_TABLE.name().equalsIgnoreCase(t.getTableType())) { - return; - } - String fullTableName = Warehouse.getQualifiedName(t); - if(t.getPartitionKeysSize() <= 0) { - if(canBeMadeAcid(fullTableName, t.getSd())) { - convertToAcid.add("ALTER TABLE " + Warehouse.getQualifiedName(t) + " SET TBLPROPERTIES (" + - "'transactional'='true')"); - if(execute) { - alterTable(t, db, false); - } - } - else { - convertToMM.add("ALTER TABLE " + Warehouse.getQualifiedName(t) + " SET TBLPROPERTIES (" + - "'transactional'='true', 'transactional_properties'='insert_only')"); - if(execute) { - alterTable(t, db, true); - } - } - } - else { - /* - each Partition may have different I/O Format so have to check them all before deciding to - make a full CRUD table. - Run in batches to prevent OOM - */ - List partNames = hms.listPartitionNames(t.getDbName(), t.getTableName(), (short)-1); - int batchSize = PARTITION_BATCH_SIZE; - int numWholeBatches = partNames.size()/batchSize; - for(int i = 0; i < numWholeBatches; i++) { - List partitionList = hms.getPartitionsByNames(t.getDbName(), t.getTableName(), - partNames.subList(i * batchSize, (i + 1) * batchSize)); - if(alterTable(fullTableName, partitionList, convertToMM, t, db, execute)) { - return; - } - } - if(numWholeBatches * batchSize < partNames.size()) { - //last partial batch - List partitionList = hms.getPartitionsByNames(t.getDbName(), t.getTableName(), - partNames.subList(numWholeBatches * batchSize, partNames.size())); - if(alterTable(fullTableName, partitionList, convertToMM, t, db, execute)) { - return; - } - } - //if here checked all parts and they are Acid compatible - make it acid - convertToAcid.add("ALTER TABLE " + Warehouse.getQualifiedName(t) + " SET TBLPROPERTIES (" + - "'transactional'='true')"); - if(execute) { - alterTable(t, db, false); - } - } - } - /** - * @return true if table was converted/command generated - */ - private static boolean alterTable(String fullTableName, List partitionList, - List convertToMM, Table t, Hive db, boolean execute) - throws InvalidOperationException, HiveException { - for(Partition p : partitionList) { - if(!canBeMadeAcid(fullTableName, p.getSd())) { - convertToMM.add("ALTER TABLE " + Warehouse.getQualifiedName(t) + " SET TBLPROPERTIES (" + - "'transactional'='true', 'transactional_properties'='insert_only')"); - if(execute) { - alterTable(t, db, true); - } - return true; - } - } - return false; - } - private static boolean canBeMadeAcid(String fullTableName, StorageDescriptor sd) { - return isAcidInputOutputFormat(fullTableName, sd) && sd.getSortColsSize() <= 0; - } - private static boolean isAcidInputOutputFormat(String fullTableName, StorageDescriptor sd) { - try { - Class inputFormatClass = sd.getInputFormat() == null ? null : - Class.forName(sd.getInputFormat()); - Class outputFormatClass = sd.getOutputFormat() == null ? null : - Class.forName(sd.getOutputFormat()); - if (inputFormatClass != null && outputFormatClass != null && - Class.forName("org.apache.hadoop.hive.ql.io.AcidInputFormat") - .isAssignableFrom(inputFormatClass) && - Class.forName("org.apache.hadoop.hive.ql.io.AcidOutputFormat") - .isAssignableFrom(outputFormatClass)) { - return true; - } - } catch (ClassNotFoundException e) { - //if a table is using some custom I/O format and it's not in the classpath, we won't mark - //the table for Acid, but today (Hive 3.1 and earlier) OrcInput/OutputFormat is the only - //Acid format - LOG.error("Could not determine if " + fullTableName + - " can be made Acid due to: " + e.getMessage(), e); - return false; - } - return false; - } /** * Generates a set compaction commands to run on pre Hive 3 cluster */ @@ -464,29 +341,6 @@ private static void makeCompactionScript(List commands, String scriptLoc "-- capacity of this queue appropriately"); } } - private static void makeConvertTableScript(List alterTableAcid, List alterTableMm, - String scriptLocation) throws IOException { - if (alterTableAcid.isEmpty()) { - LOG.info("No acid conversion is necessary"); - } else { - String fileName = "convertToAcid_" + System.currentTimeMillis() + ".sql"; - LOG.debug("Writing CRUD conversion commands to " + fileName); - try(PrintWriter pw = createScript(alterTableAcid, fileName, scriptLocation)) { - //todo: fix this - it has to run in 3.0 since tables may be unbucketed - pw.println("-- These commands may be executed by Hive 1.x later"); - } - } - - if (alterTableMm.isEmpty()) { - LOG.info("No managed table conversion is necessary"); - } else { - String fileName = "convertToMM_" + System.currentTimeMillis() + ".sql"; - LOG.debug("Writing managed table conversion commands to " + fileName); - try(PrintWriter pw = createScript(alterTableMm, fileName, scriptLocation)) { - pw.println("-- These commands must be executed by Hive 3.0 or later"); - } - } - } private static PrintWriter createScript(List commands, String fileName, String scriptLocation) throws IOException { @@ -497,22 +351,11 @@ private static PrintWriter createScript(List commands, String fileName, } return pw; } - private static void makeRenameFileScript(String scriptLocation) throws IOException { - List commands = Collections.emptyList(); - if (commands.isEmpty()) { - LOG.info("No file renaming is necessary"); - } else { - String fileName = "normalizeFileNames_" + System.currentTimeMillis() + ".sh"; - LOG.debug("Writing file renaming commands to " + fileName); - PrintWriter pw = createScript(commands, fileName, scriptLocation); - pw.close(); - } - } /** * @return any compaction commands to run for {@code Table t} */ private static List getCompactionCommands(Table t, HiveConf conf, - HiveMetaStoreClient hms, CompactionMetaInfo compactionMetaInfo, boolean execute, Hive db, + IMetaStoreClient hms, CompactionMetaInfo compactionMetaInfo, boolean execute, Hive db, ValidTxnList txns) throws IOException, TException, HiveException { if(!isFullAcidTable(t)) { return Collections.emptyList(); @@ -795,11 +638,6 @@ void onWaitForCompaction() throws MetaException {} static Callback callback; @VisibleForTesting static int pollIntervalMs = 1000*30; - /** - * Also to enable testing until I set up Maven profiles to be able to run with 3.0 jars - */ - @VisibleForTesting - static boolean isTestMode = false; /** * can set it from tests to test when config needs something other than default values */ diff --git upgrade-acid/src/test/java/org/apache/hadoop/hive/upgrade/acid/TestUpgradeTool.java upgrade-acid/pre-upgrade/src/test/java/org/apache/hadoop/hive/upgrade/acid/TestPreUpgradeTool.java similarity index 72% rename from upgrade-acid/src/test/java/org/apache/hadoop/hive/upgrade/acid/TestUpgradeTool.java rename to upgrade-acid/pre-upgrade/src/test/java/org/apache/hadoop/hive/upgrade/acid/TestPreUpgradeTool.java index c8964a4a4c..4fe7007c96 100644 --- upgrade-acid/src/test/java/org/apache/hadoop/hive/upgrade/acid/TestUpgradeTool.java +++ upgrade-acid/pre-upgrade/src/test/java/org/apache/hadoop/hive/upgrade/acid/TestPreUpgradeTool.java @@ -52,10 +52,10 @@ import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; -public class TestUpgradeTool { - private static final Logger LOG = LoggerFactory.getLogger(TestUpgradeTool.class); +public class TestPreUpgradeTool { + private static final Logger LOG = LoggerFactory.getLogger(TestPreUpgradeTool.class); private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") + - File.separator + TestUpgradeTool.class.getCanonicalName() + "-" + System.currentTimeMillis() + File.separator + TestPreUpgradeTool.class.getCanonicalName() + "-" + System.currentTimeMillis() ).getPath().replaceAll("\\\\", "/"); private String getTestDataDir() { @@ -78,6 +78,7 @@ public void testUpgrade() throws Exception { runStatementOnDriver("create table TAcid (a int, b int) clustered by (b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); runStatementOnDriver("create table TAcidPart (a int, b int) partitioned by (p tinyint) clustered by (b) into 2 buckets stored" + " as orc TBLPROPERTIES ('transactional'='true')"); + //on 2.x these are guaranteed to not be acid runStatementOnDriver("create table TFlat (a int, b int) stored as orc tblproperties('transactional'='false')"); runStatementOnDriver("create table TFlatText (a int, b int) stored as textfile tblproperties('transactional'='false')"); @@ -99,19 +100,19 @@ public void testUpgrade() throws Exception { //todo: add partitioned table that needs conversion to MM/Acid //todo: rename files case - String[] args = {"-location", getTestDataDir(), "-preUpgrade", "-execute"}; - UpgradeTool.callback = new UpgradeTool.Callback() { + String[] args = {"-location", getTestDataDir(), "-execute"}; + PreUpgradeTool.callback = new PreUpgradeTool.Callback() { @Override void onWaitForCompaction() throws MetaException { runWorker(hiveConf); } }; - UpgradeTool.pollIntervalMs = 1; - UpgradeTool.hiveConf = hiveConf; - UpgradeTool.main(args); + PreUpgradeTool.pollIntervalMs = 1; + PreUpgradeTool.hiveConf = hiveConf; + PreUpgradeTool.main(args); /* todo: parse - target/tmp/org.apache.hadoop.hive.upgrade.acid.TestUpgradeTool-1527286256834/compacts_1527286277624.sql + target/tmp/org.apache.hadoop.hive.upgrade.acid.TestPreUpgradeTool-1527286256834/compacts_1527286277624.sql make sure it's the only 'compacts' file and contains ALTER TABLE default.tacid COMPACT 'major'; ALTER TABLE default.tacidpart PARTITION(p=10Y) COMPACT 'major'; @@ -125,68 +126,13 @@ void onWaitForCompaction() throws MetaException { Assert.assertEquals(e.toString(), TxnStore.CLEANING_RESPONSE, e.getState()); } - String[] args2 = {"-location", getTestDataDir(), "-postUpgrade"}; - UpgradeTool.main(args2); + String[] args2 = {"-location", getTestDataDir()}; + PreUpgradeTool.main(args2); /* - * todo: parse - * convertToAcid_1527286288784.sql make sure it has - * ALTER TABLE default.tflat SET TBLPROPERTIES ('transactional'='true'); - * convertToMM_1527286288784.sql make sure it has - * ALTER TABLE default.tflattext SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only'); + * todo: parse compacts script - make sure there is nothing in it * */ } - /** - * includes 'execute' for postUpgrade - * @throws Exception - */ - @Test - public void testPostUpgrade() throws Exception { - int[][] dataPart = {{1, 2, 10}, {3, 4, 11}, {5, 6, 12}}; - hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "dynamic"); - runStatementOnDriver("drop table if exists TAcid"); - runStatementOnDriver("drop table if exists TAcidPart"); - runStatementOnDriver("drop table if exists TFlat"); - runStatementOnDriver("drop table if exists TFlatText"); - - runStatementOnDriver("create table TAcid (a int, b int) clustered by (b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='false')"); - runStatementOnDriver("create table TAcidPart (a int, b int) partitioned by (p int) clustered by (b) into 2 buckets stored" + - " as orc TBLPROPERTIES ('transactional'='false')"); - //to create some partitions - runStatementOnDriver("insert into TAcidPart partition(p)" + makeValuesClause(dataPart)); - - - //todo: to test these need to link against 3.x libs - maven profiles? - //runStatementOnDriver("create table TFlat (a int, b int) stored as orc tblproperties('transactional'='false')"); - //runStatementOnDriver("create table TFlatText (a int, b int) stored as textfile tblproperties('transactional'='false')"); - - Hive db = Hive.get(hiveConf); - Table tacid = db.getTable("default", "tacid"); - Assert.assertEquals("Expected TAcid to become full acid", false, AcidUtils.isAcidTable(tacid)); - Table tacidpart = db.getTable("default", "tacidpart"); - Assert.assertEquals("Expected TAcidPart to become full acid", false, - AcidUtils.isAcidTable(tacidpart)); - - - String[] args2 = {"-location", getTestDataDir(), "-postUpgrade", "-execute"}; - UpgradeTool.isTestMode = true; - UpgradeTool.hiveConf = hiveConf; - UpgradeTool.main(args2); - - tacid = db.getTable("default", "tacid"); - Assert.assertEquals("Expected TAcid to become full acid", true, AcidUtils.isAcidTable(tacid)); - tacidpart = db.getTable("default", "tacidpart"); - Assert.assertEquals("Expected TAcidPart to become full acid", true, - AcidUtils.isAcidTable(tacidpart)); - - /** - todo: parse - target/tmp/org.apache.hadoop.hive.upgrade.acid.TestUpgradeTool-1527286026461/convertToAcid_1527286063065.sql - make sure it has: - ALTER TABLE default.tacid SET TBLPROPERTIES ('transactional'='true'); - ALTER TABLE default.tacidpart SET TBLPROPERTIES ('transactional'='true'); - */ - } private static void runWorker(HiveConf hiveConf) throws MetaException { AtomicBoolean stop = new AtomicBoolean(true); Worker t = new Worker();