diff --git pom.xml pom.xml index 1f43c416db..d19398f651 100644 --- pom.xml +++ pom.xml @@ -62,6 +62,7 @@ testutils packaging standalone-metastore + upgrade-acid diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/HiveMetaTool.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/HiveMetaTool.java index 875eba3c67..283f945dc5 100644 --- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/HiveMetaTool.java +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/HiveMetaTool.java @@ -410,7 +410,7 @@ private void prepareAcidUpgrade(String scriptLocation) { /** * todo: make sure compaction queue is configured and has ample capacity * todo: what to do on failure? Suppose some table/part is not readable. should it produce - * todo: should probably suppor dryRun mode where we output scripts but instead of renaming files + * todo: should probably support dryRun mode where we output scripts but instead of renaming files * we generate a renaming script. Alternatively, always generate a renaming script and have * user execute it - this is probably a better option. If script is not empty on rerun someone * added files to table to be made Acid. @@ -427,6 +427,37 @@ private void prepareAcidUpgrade(String scriptLocation) { * todo: this should accept a file of table names to exclude from non-acid to acid conversion * todo: change script comments to a preamble instead of a footer * + * + * How to make this blocking? For File renames and converting to Acid/MM we can do it synchronously + * for Compactions we have to enqueue all of them and then wait to complete. + * Ideally we'd just wait for the compaction Ids that we started to complete - which is what ready for cleaning or successful + * BTW, what if it fails? Do we resumbit? + * What if for given tbl/part there is prior failure? Can we tell the difference? + * + * DDLTaks.compact() writes to Console a msg with ID - this way we know what we enqueued - if we can parse it out + * Hive.compact2() returns CompactionResponse which has the ID - maybe we can schedule compacts manually + * Hive.get(conf) to get Hive + * + * can we instead do show compactions, and get highest ID before start and then wait for those above to finish? + * + * start with basic - do show compacts, get min ID, submit all, get max id, make sure all in between have completed + * + * + * + * Embedded HS2 is just JDBC with appropriate URL + * https://cwiki.apache.org/confluence/display/Hive/HiveServer2+Clients#HiveServer2Clients-ConnectionURLforRemoteorEmbeddedMode + * Submitting compaction commands via jdbc will definitely not return any compaction IDs + * + * Thejas said that Hive.get(conf) should be fine as long as the right hive-site.xml is on the class path + * and tool is run as 'hive' user - this way it will be able to get to MySql + * + * This should output scripts in all cases to provide a record of what it's doing + * For blocking compaction submission, use Hive.compact2() directly to get compaction ID and wait for it + * This will happen with 2.6 Hive binaries on the CP + * how does rename script work? "hadoop fs -mv oldname newname" * and what what about S3? + * How does this actually get executed? + * all other actions are done via embedded JDBC + * * @throws MetaException * @throws TException */ @@ -448,7 +479,7 @@ private void prepareAcidUpgradeInternal(String scriptLocation) throws MetaExcept for(String tableName : tables) { Table t = hms.getTable(dbName, tableName); - //ql depends on metastore and is not accessible here... and if it was, I would not be using + //ql depends on metastore and is not accessible here... and if it was, it would not be using //2.6 exec jar, but 3.0.... which is not what we want List compactionCommands = getCompactionCommands(t, conf, hms, compactionMetaInfo); compactions.addAll(compactionCommands); @@ -705,6 +736,7 @@ public boolean accept(Path path) { if(needsCompaction(bucket, fs)) { //found delete events - this 'location' needs compacting compactionMetaInfo.numberOfBytes += getDataSize(location, conf); + //todo: this is not remotely accurate if you have many (relevant) original files return true; } } @@ -738,6 +770,7 @@ private static boolean needsCompaction(FileStatus bucket, FileSystem fs) throws } return as.deletes > 0 || as.updates > 0; } + //todo: add option to submit directly and a Set to add compaction id private static String getCompactionCommand(Table t, Partition p) { StringBuilder sb = new StringBuilder("ALTER TABLE ").append(Warehouse.getQualifiedName(t)); if(t.getPartitionKeysSize() > 0) { @@ -745,7 +778,7 @@ private static String getCompactionCommand(Table t, Partition p) { Warehouse.getQualifiedName(t); sb.append(" PARTITION("); for (int i = 0; i < t.getPartitionKeysSize(); i++) { - //todo: should these be quoted? HiveUtils.unparseIdentifier() - if value is String should definitely quote + //todo: should these be quoted? HiveUtils.unparseIdentifier() - if value is String should definitely quote - check SemanticAnalyzer sb.append(t.getPartitionKeys().get(i).getName()).append('=') .append(p.getValues().get(i)).append(","); } diff --git upgrade-acid/pom.xml upgrade-acid/pom.xml new file mode 100644 index 0000000000..02f02a407a --- /dev/null +++ upgrade-acid/pom.xml @@ -0,0 +1,467 @@ + + + + + + org.apache + apache + + 18 + + + 4.0.0 + + org.apache.hive + 4.0.0-SNAPSHOT + upgrade-acid + Hive Upgrade Acid + jar + + + + UTF-8 + 1.8 + 1.8 + false + ${settings.localRepository} + 2.3 + 1.6.0 + .. + + 1.0b3 + 1.7 + ${basedir}/checkstyle/ + 2.17 + 2.20.1 + + + ${project.build.directory}/testconf + file:// + ${project.basedir}/src/test/resources + ${project.build.directory}/tmp + ${project.build.directory}/warehouse + file:// + 1 + true + + + + commons-cli + commons-cli + + 1.2 + + + org.apache.hive + hive-metastore + 2.3.3 + + + org.apache.hive + hive-exec + 2.3.3 + + + org.apache.hadoop + hadoop-common + 2.7.2 + + + + org.apache.hadoop + hadoop-mapreduce-client-common + 2.7.2 + + + + org.apache.orc + orc-core + 1.3.3 + + + + + + + + + ${basedir}/src/main/resources + + package.jdo + + + + + + + + org.apache.maven.plugins + maven-antrun-plugin + ${maven.antrun.plugin.version} + + + ant-contrib + ant-contrib + ${ant.contrib.version} + + + ant + ant + + + + + + + org.apache.maven.plugins + maven-checkstyle-plugin + ${maven.checkstyle.plugin.version} + + + org.codehaus.mojo + exec-maven-plugin + ${maven.exec.plugin.version} + + + + + + + org.apache.maven.plugins + maven-antrun-plugin + + + setup-test-dirs + process-test-resources + + run + + + + + + + + + + + + + + + + + + + setup-metastore-scripts + process-test-resources + + run + + + + + + + + + + + + + + + + + + + + + + org.apache.maven.plugins + maven-failsafe-plugin + 2.20.1 + + + + integration-test + verify + + + + + true + false + -Xmx2048m + false + + true + ${test.tmp.dir} + ${test.tmp.dir} + true + + + ${log4j.conf.dir} + + ${skipITests} + + + + org.apache.maven.plugins + maven-surefire-plugin + ${maven.surefire.version} + + true + false + ${test.forkcount} + -Xmx2048m + false + + ${project.build.directory} + true + ${derby.version} + ${test.tmp.dir}/derby.log + + ${test.log4j.scheme}${test.conf.dir}/hive-log4j2.properties + true + ${test.tmp.dir} + + jdbc:derby:${test.tmp.dir}/junit_metastore_db;create=true + false + ${test.tmp.dir} + ${test.warehouse.scheme}${test.warehouse.dir} + + + + ${log4j.conf.dir} + ${test.conf.dir} + + ${test.conf.dir}/conf + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + test-jar + + + + + + + + + + \ No newline at end of file diff --git upgrade-acid/src/main/java/org/apache/hadoop/hive/upgrade/acid/UpgradeTool.java upgrade-acid/src/main/java/org/apache/hadoop/hive/upgrade/acid/UpgradeTool.java new file mode 100644 index 0000000000..651beaef1c --- /dev/null +++ upgrade-acid/src/main/java/org/apache/hadoop/hive/upgrade/acid/UpgradeTool.java @@ -0,0 +1,457 @@ +package org.apache.hadoop.hive.upgrade.acid; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.Options; +import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.impl.AcidStats; +import org.apache.orc.impl.OrcAcidUtils; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * Need pre-upgrade, post-upgrade modes since we don't want to run compaction after upgrade for example + * support dry-run mode - this just generates all the scripts otherwise it executes them + */ +public class UpgradeTool { + private static final Logger LOG = LoggerFactory.getLogger(UpgradeTool.class); + private final Options cmdLineOptions = new Options(); + + public static void main(String[] args) throws Exception { + CommandLineParser parser = new GnuParser(); + CommandLine line = null; + UpgradeTool tool = new UpgradeTool(); + try { + tool.prepareAcidUpgradeInternal("."); + } + catch(Exception ex) { + LOG.error("UpgradeTool failed", ex); + throw ex; + } + } + private static class CompactionMetaInfo { + /** + * total number of bytes to be compacted across all compaction commands + */ + long numberOfBytes; + //BlockStoragePolicySpi f;//todo: this is only in hadoop 3.0 + } + /** + * todo: make sure compaction queue is configured and has ample capacity + * todo: what to do on failure? Suppose some table/part is not readable. should it produce + * todo: should probably support dryRun mode where we output scripts but instead of renaming files + * we generate a renaming script. Alternatively, always generate a renaming script and have + * user execute it - this is probably a better option. If script is not empty on rerun someone + * added files to table to be made Acid. + * commands for all other tables? + * todo: how do we test this? even if we had 2.x data it won't be readable in 3.0. even w/o any + * updates, txnids in the data won't make sense in 3.0 w/o actually performing equivalent of + * metastore upgrade to init writeid table. Also, can we even create a new table and set location + * to existing files to simulate a 2.x table? + * todo: generate some instructions in compaction script to include tottal compactions to perform, + * total data volume to handle and anything else that may help users guess at how long it will + * take. Also, highlight tuning options to speed this up. + * todo: can we make the script blocking so that it waits for cleaner to delete files? + * need to poll SHOW COMPACTIONS and make sure that all partitions are in "finished" state + * todo: this should accept a file of table names to exclude from non-acid to acid conversion + * todo: change script comments to a preamble instead of a footer + * + * + * How to make this blocking? For File renames and converting to Acid/MM we can do it synchronously + * for Compactions we have to enqueue all of them and then wait to complete. + * Ideally we'd just wait for the compaction Ids that we started to complete - which is what ready for cleaning or successful + * BTW, what if it fails? Do we resumbit? + * What if for given tbl/part there is prior failure? Can we tell the difference? + * + * DDLTaks.compact() writes to Console a msg with ID - this way we know what we enqueued - if we can parse it out + * Hive.compact2() returns CompactionResponse which has the ID - maybe we can schedule compacts manually + * Hive.get(conf) to get Hive + * + * can we instead do show compactions, and get highest ID before start and then wait for those above to finish? + * + * start with basic - do show compacts, get min ID, submit all, get max id, make sure all in between have completed + * + * + * + * Embedded HS2 is just JDBC with appropriate URL + * https://cwiki.apache.org/confluence/display/Hive/HiveServer2+Clients#HiveServer2Clients-ConnectionURLforRemoteorEmbeddedMode + * Submitting compaction commands via jdbc will definitely not return any compaction IDs + * + * Thejas said that Hive.get(conf) should be fine as long as the right hive-site.xml is on the class path + * and tool is run as 'hive' user - this way it will be able to get to MySql + * + * This should output scripts in all cases to provide a record of what it's doing + * For blocking compaction submission, use Hive.compact2() directly to get compaction ID and wait for it + * This will happen with 2.6 Hive binaries on the CP + * how does rename script work? "hadoop fs -mv oldname newname" * and what what about S3? + * How does this actually get executed? + * all other actions are done via embedded JDBC + * + * @throws MetaException + * @throws TException + */ + private void prepareAcidUpgradeInternal(String scriptLocation) throws MetaException, TException, IOException { + //Configuration conf = MetastoreConf.newMetastoreConf(); + HiveConf conf = new HiveConf(); + System.out.println("Looking for Acid tables that need to be compacted"); + //todo: check if acid is enabled and bail if not + //todo: check that running on 2.x? + HiveMetaStoreClient hms = new HiveMetaStoreClient(conf);//MetaException + List databases = hms.getAllDatabases();//TException + System.out.println("Found " + databases.size() + " databases to process"); + List compactions = new ArrayList<>(); + List convertToAcid = new ArrayList<>(); + List convertToMM = new ArrayList<>(); + final CompactionMetaInfo compactionMetaInfo = new CompactionMetaInfo(); + for(String dbName : databases) { + List tables = hms.getAllTables(dbName); + System.out.println("found " + tables.size() + " tables in " + dbName); + for(String tableName : tables) { + Table t = hms.getTable(dbName, tableName); + + //ql depends on metastore and is not accessible here... and if it was, it would not be using + //2.6 exec jar, but 3.0.... which is not what we want + List compactionCommands = getCompactionCommands(t, conf, hms, compactionMetaInfo); + compactions.addAll(compactionCommands); + processConversion(t, convertToAcid, convertToMM, hms); + /*todo: handle renaming files somewhere + * */ + } + } + makeCompactionScript(compactions, scriptLocation, compactionMetaInfo); + makeConvertTableScript(convertToAcid, convertToMM, scriptLocation); + makeRenameFileScript(scriptLocation); + } + //todo: handle exclusion list + private static void processConversion(Table t, List convertToAcid, + List convertToMM, HiveMetaStoreClient hms) throws TException { + if(isFullAcidTable(t)) { + return; + } + if(!TableType.MANAGED_TABLE.name().equalsIgnoreCase(t.getTableType())) { + return; + } + String fullTableName = Warehouse.getQualifiedName(t); + if(t.getPartitionKeysSize() <= 0) { + if(canBeMadeAcid(fullTableName, t.getSd())) { + convertToAcid.add("ALTER TABLE " + Warehouse.getQualifiedName(t) + " SET TBLPROPERTIES (" + + "'transactional'='true')"); + } + else { + convertToMM.add("ALTER TABLE " + Warehouse.getQualifiedName(t) + " SET TBLPROPERTIES (" + + "'transactional'='true', 'transactional_properties'='insert_only')"); + } + } + else { + List partNames = hms.listPartitionNames(t.getDbName(), t.getTableName(), (short)-1); + int batchSize = 10000;//todo: right size? + int numWholeBatches = partNames.size()/batchSize; + for(int i = 0; i < numWholeBatches; i++) { + List partitionList = hms.getPartitionsByNames(t.getDbName(), t.getTableName(), + partNames.subList(i * batchSize, (i + 1) * batchSize)); + for(Partition p : partitionList) { + if(!canBeMadeAcid(fullTableName, p.getSd())) { + convertToMM.add("ALTER TABLE " + Warehouse.getQualifiedName(t) + " SET TBLPROPERTIES (" + + "'transactional'='true', 'transactional_properties'='insert_only')"); + return; + } + } + } + if(numWholeBatches * batchSize < partNames.size()) { + //last partial batch + List partitionList = hms.getPartitionsByNames(t.getDbName(), t.getTableName(), + partNames.subList(numWholeBatches * batchSize, partNames.size())); + for (Partition p : partitionList) { + if (!canBeMadeAcid(fullTableName, p.getSd())) { + convertToMM.add("ALTER TABLE " + Warehouse.getQualifiedName(t) + " SET TBLPROPERTIES (" + + "'transactional'='true', 'transactional_properties'='insert_only')"); + return; + } + } + } + //if here checked all parts and they are Acid compatible - make it acid + convertToAcid.add("ALTER TABLE " + Warehouse.getQualifiedName(t) + " SET TBLPROPERTIES (" + + "'transactional'='true')"); + } + } + private static boolean canBeMadeAcid(String fullTableName, StorageDescriptor sd) { + return isAcidInputOutputFormat(fullTableName, sd) && sd.getSortColsSize() <= 0; + } + private static boolean isAcidInputOutputFormat(String fullTableName, StorageDescriptor sd) { + try { + Class inputFormatClass = sd.getInputFormat() == null ? null : + Class.forName(sd.getInputFormat()); + Class outputFormatClass = sd.getOutputFormat() == null ? null : + Class.forName(sd.getOutputFormat()); + + if (inputFormatClass != null && outputFormatClass != null && + Class.forName("org.apache.hadoop.hive.ql.io.AcidInputFormat") + .isAssignableFrom(inputFormatClass) && + Class.forName("org.apache.hadoop.hive.ql.io.AcidOutputFormat") + .isAssignableFrom(outputFormatClass)) { + return true; + } + } catch (ClassNotFoundException e) { + //if a table is using some custom I/O format and it's not in the classpath, we won't mark + //the table for Acid, but today (Hive 3.1 and earlier) OrcInput/OutputFormat is the only + //Acid format + System.err.println("Could not determine if " + fullTableName + + " can be made Acid due to: " + e.getMessage()); + return false; + } + return false; + } + /** + * currently writes to current dir (whatever that is). + * If there is nothing to compact, outputs empty file so as not to confuse the output with a + * failed run. + * todo: add some config to tell it where to put the script + */ + private static void makeCompactionScript(List commands, String scriptLocation, + CompactionMetaInfo compactionMetaInfo) throws IOException { + if (commands.isEmpty()) { + System.out.println("No compaction is necessary"); + return; + } + String fileName = "compacts_" + System.currentTimeMillis() + ".sql"; + System.out.println("Writing compaction commands to " + fileName); + try(PrintWriter pw = createScript(commands, fileName, scriptLocation)) { + //add post script + pw.println("-- Generated total of " + commands.size() + " compaction commands"); + if(compactionMetaInfo.numberOfBytes < Math.pow(2, 20)) { + //to see it working in UTs + pw.println("-- The total volume of data to be compacted is " + + String.format("%.6fMB", compactionMetaInfo.numberOfBytes/Math.pow(2, 20))); + } + else { + pw.println("-- The total volume of data to be compacted is " + + String.format("%.3fGB", compactionMetaInfo.numberOfBytes/Math.pow(2, 30))); + } + pw.println(); + pw.println( + "-- Please note that compaction may be a heavyweight and time consuming process.\n" + + "-- Submitting all of these commands will enqueue them to a scheduling queue from\n" + + "-- which they will be picked up by compactor Workers. The max number of\n" + + "-- concurrent Workers is controlled by hive.compactor.worker.threads configured\n" + + "-- for the standalone metastore process. Compaction itself is a Map-Reduce job\n" + + "-- which is submitted to the YARN queue identified by hive.compactor.job.queue\n" + + "-- property if defined or 'default' if not defined. It's advisable to set the\n" + + "-- capacity of this queue appropriately"); + } + } + private static void makeConvertTableScript(List alterTableAcid, List alterTableMm, + String scriptLocation) throws IOException { + if (alterTableAcid.isEmpty()) { + System.out.println("No acid conversion is necessary"); + } else { + String fileName = "convertToAcid_" + System.currentTimeMillis() + ".sql"; + System.out.println("Writing acid conversion commands to " + fileName); + try(PrintWriter pw = createScript(alterTableAcid, fileName, scriptLocation)) { + //todo: fix this - it has to run in 3.0 since tables may be unbucketed + pw.println("-- These commands may be executed by Hive 1.x later"); + } + } + + if (alterTableMm.isEmpty()) { + System.out.println("No managed table conversion is necessary"); + } else { + String fileName = "convertToMM_" + System.currentTimeMillis() + ".sql"; + System.out.println("Writing managed table conversion commands to " + fileName); + try(PrintWriter pw = createScript(alterTableMm, fileName, scriptLocation)) { + pw.println("-- These commands must be executed by Hive 3.0 or later"); + } + } + } + + private static PrintWriter createScript(List commands, String fileName, + String scriptLocation) throws IOException { + //todo: make sure to create the file in 'scriptLocation' dir + FileWriter fw = new FileWriter(scriptLocation + "/" + fileName); + PrintWriter pw = new PrintWriter(fw); + for(String cmd : commands) { + pw.println(cmd + ";"); + } + return pw; + } + private static void makeRenameFileScript(String scriptLocation) throws IOException { + List commands = Collections.emptyList(); + if (commands.isEmpty()) { + System.out.println("No file renaming is necessary"); + } else { + String fileName = "normalizeFileNames_" + System.currentTimeMillis() + ".sh"; + System.out.println("Writing file renaming commands to " + fileName); + PrintWriter pw = createScript(commands, fileName, scriptLocation); + pw.close(); + } + } + /** + * @return any compaction commands to run for {@code Table t} + */ + private static List getCompactionCommands(Table t, HiveConf conf, + HiveMetaStoreClient hms, CompactionMetaInfo compactionMetaInfo) + throws IOException, TException { + if(!isFullAcidTable(t)) { + return Collections.emptyList(); + } + if(t.getPartitionKeysSize() <= 0) { + //not partitioned + if(!needsCompaction(new Path(t.getSd().getLocation()), conf, compactionMetaInfo)) { + return Collections.emptyList(); + } + + List cmds = new ArrayList<>(); + cmds.add(getCompactionCommand(t, null)); + return cmds; + } + List partNames = hms.listPartitionNames(t.getDbName(), t.getTableName(), (short)-1); + int batchSize = 10000;//todo: right size? + int numWholeBatches = partNames.size()/batchSize; + List compactionCommands = new ArrayList<>(); + for(int i = 0; i < numWholeBatches; i++) { + List partitionList = hms.getPartitionsByNames(t.getDbName(), t.getTableName(), partNames.subList(i * batchSize, (i + 1) * batchSize)); + for(Partition p : partitionList) { + if(needsCompaction(new Path(p.getSd().getLocation()), conf, compactionMetaInfo)) { + compactionCommands.add(getCompactionCommand(t, p)); + } + } + } + if(numWholeBatches * batchSize < partNames.size()) { + //last partial batch + List partitionList = hms.getPartitionsByNames(t.getDbName(), t.getTableName(), partNames.subList(numWholeBatches * batchSize, partNames.size())); + for (Partition p : partitionList) { + if (needsCompaction(new Path(p.getSd().getLocation()), conf, compactionMetaInfo)) { + compactionCommands.add(getCompactionCommand(t, p)); + } + } + } + return compactionCommands; + } + /** + * + * @param location - path to a partition (or table if not partitioned) dir + */ + private static boolean needsCompaction(Path location, HiveConf conf, + CompactionMetaInfo compactionMetaInfo) throws IOException { + FileSystem fs = location.getFileSystem(conf); + FileStatus[] deltas = fs.listStatus(location, new PathFilter() { + @Override + public boolean accept(Path path) { + //checking for delete_delta is only so that this functionality can be exercised by code 3.0 + //which cannot produce any deltas with mix of update/insert events + return path.getName().startsWith("delta_") || path.getName().startsWith("delete_delta_"); + } + }); + if(deltas == null || deltas.length == 0) { + //base_n cannot contain update/delete. Original files are all 'insert' and we need to compact + //only if there are update/delete events. + return false; + } + deltaLoop: for(FileStatus delta : deltas) { + if(!delta.isDirectory()) { + //should never happen - just in case + continue; + } + FileStatus[] buckets = fs.listStatus(delta.getPath(), new PathFilter() { + @Override + public boolean accept(Path path) { + //since this is inside a delta dir created by Hive 2.x or earlier it can only contain + //bucket_x or bucket_x__flush_length + return path.getName().startsWith("bucket_"); + } + }); + for(FileStatus bucket : buckets) { + if(bucket.getPath().getName().endsWith("_flush_length")) { + //streaming ingest dir - cannot have update/delete events + continue deltaLoop; + } + if(needsCompaction(bucket, fs)) { + //found delete events - this 'location' needs compacting + compactionMetaInfo.numberOfBytes += getDataSize(location, conf); + //todo: this is not remotely accurate if you have many (relevant) original files + return true; + } + } + } + return false; + } + + /** + * @param location - path to a partition (or table if not partitioned) dir + * @throws IOException + */ + private static long getDataSize(Path location, HiveConf conf) throws IOException { + /* + * todo: Figure out the size of the partition. The + * best way is to getAcidState() and look at each file - this way it takes care of + * original files vs base and any other obsolete files. For now just brute force it, + * it's likely close enough for a rough estimate.*/ + FileSystem fs = location.getFileSystem(conf); + ContentSummary cs = fs.getContentSummary(location); + return cs.getLength(); + } + private static boolean needsCompaction(FileStatus bucket, FileSystem fs) throws IOException { + //create reader, look at footer + //no need to check side file since it can only be in a streaming ingest delta + Reader orcReader = OrcFile.createReader(bucket.getPath(),OrcFile.readerOptions(fs.getConf()) + .filesystem(fs)); + AcidStats as = OrcAcidUtils.parseAcidStats(orcReader); + if(as == null) { + //should never happen since we are reading bucket_x written by acid write + throw new IllegalStateException("AcidStats missing in " + bucket.getPath()); + } + return as.deletes > 0 || as.updates > 0; + } + //todo: add option to submit directly and a Set to add compaction id + private static String getCompactionCommand(Table t, Partition p) { + StringBuilder sb = new StringBuilder("ALTER TABLE ").append(Warehouse.getQualifiedName(t)); + if(t.getPartitionKeysSize() > 0) { + assert p != null : "must supply partition for partitioned table " + + Warehouse.getQualifiedName(t); + sb.append(" PARTITION("); + for (int i = 0; i < t.getPartitionKeysSize(); i++) { + //todo: should these be quoted? HiveUtils.unparseIdentifier() - if value is String should definitely quote - check SemanticAnalyzer + sb.append(t.getPartitionKeys().get(i).getName()).append('=') + .append(p.getValues().get(i)).append(","); + } + sb.setCharAt(sb.length() - 1, ')');//replace trailing ',' + } + return sb.append(" COMPACT 'major'").toString(); + } + private static boolean isFullAcidTable(Table t) { + if (t.getParametersSize() <= 0) { + //cannot be acid + return false; + } + String transacationalValue = t.getParameters() + .get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL); + if (transacationalValue != null && "true".equalsIgnoreCase(transacationalValue)) { + System.out.println("Found Acid table: " + Warehouse.getQualifiedName(t)); + return true; + } + return false; + } +} diff --git upgrade-acid/src/test/java/org/apache/hadoop/hive/upgrade/acid/TestUpgradeTool.java upgrade-acid/src/test/java/org/apache/hadoop/hive/upgrade/acid/TestUpgradeTool.java new file mode 100644 index 0000000000..6511ac4dba --- /dev/null +++ upgrade-acid/src/test/java/org/apache/hadoop/hive/upgrade/acid/TestUpgradeTool.java @@ -0,0 +1,153 @@ +package org.apache.hadoop.hive.upgrade.acid; + +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.QueryState; +import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; +import org.apache.hadoop.hive.ql.io.HiveInputFormat; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; + +public class TestUpgradeTool { + private static final Logger LOG = LoggerFactory.getLogger(TestUpgradeTool.class); + private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") + + File.separator + TestUpgradeTool.class.getCanonicalName() + "-" + System.currentTimeMillis() + ).getPath().replaceAll("\\\\", "/"); + + String getTestDataDir() { + return TEST_DATA_DIR; + } + @Test + public void testUpgrade() throws Exception { + MapRedTask t = null; + int[][] data = {{1,2}, {3, 4}, {5, 6}}; + int[][] dataPart = {{1, 2, 10}, {3, 4, 11}, {5, 6, 12}}; + runStatementOnDriver("drop table if exists TAcid"); + runStatementOnDriver("drop table if exists TAcidPart"); + runStatementOnDriver("drop table if exists TFlat"); + runStatementOnDriver("drop table if exists TFlatText"); +// runStatementOnDriver("create table " + Table.ACIDTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + + runStatementOnDriver("create table TAcid (a int, b int) clustered by (b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + runStatementOnDriver("create table TAcidPart (a int, b int) partitioned by (p int) clustered by (b) into 2 buckets stored" + + " as orc TBLPROPERTIES ('transactional'='true')"); + runStatementOnDriver("create table TFlat (a int, b int) stored as orc tblproperties('transactional'='false')"); + runStatementOnDriver("create table TFlatText (a int, b int) stored as textfile tblproperties('transactional'='false')"); + + + //this needs major compaction + runStatementOnDriver("insert into TAcid" + makeValuesClause(data)); + runStatementOnDriver("update TAcid set a = 1 where b = 2"); + + //this table needs to be converted to Acid + runStatementOnDriver("insert into TFlat" + makeValuesClause(data)); + + //this table needs to be converted to MM + runStatementOnDriver("insert into TFlatText" + makeValuesClause(data)); + + //p=10 needs major compaction + runStatementOnDriver("insert into TAcidPart partition(p)" + makeValuesClause(dataPart)); + runStatementOnDriver("update TAcidPart set a = 1 where b = 2 and p = 10"); + + //todo: add partitioned table that needs conversion to MM/Acid + + //todo: rename files case + String[] args = new String[1]; + args[0] = new String("-prepareAcidUpgrade"); +// HiveMetaTool.main(args); + UpgradeTool.main(args); + } + static String makeValuesClause(int[][] rows) { + assert rows.length > 0; + StringBuilder sb = new StringBuilder(" values"); + for(int[] row : rows) { + assert row.length > 0; + if(row.length > 1) { + sb.append("("); + } + for(int value : row) { + sb.append(value).append(","); + } + sb.setLength(sb.length() - 1);//remove trailing comma + if(row.length > 1) { + sb.append(")"); + } + sb.append(","); + } + sb.setLength(sb.length() - 1);//remove trailing comma + return sb.toString(); + } + + List runStatementOnDriver(String stmt) throws Exception { + CommandProcessorResponse cpr = d.run(stmt); + if(cpr.getResponseCode() != 0) { + throw new RuntimeException(stmt + " failed: " + cpr); + } + List rs = new ArrayList(); + d.getResults(rs); + return rs; + } + @Before + public void setUp() throws Exception { + setUpInternal(); + } + void initHiveConf() { + hiveConf = new HiveConf(this.getClass()); + } + @Rule + public TestName testName = new TestName(); + HiveConf hiveConf; + Driver d; + void setUpInternal() throws Exception { + initHiveConf(); + Path workDir = new Path(System.getProperty("test.tmp.dir", + "target" + File.separator + "test" + File.separator + "tmp")); + hiveConf.set("mapred.local.dir", workDir + File.separator + this.getClass().getSimpleName() + + File.separator + "mapred" + File.separator + "local"); + hiveConf.set("mapred.system.dir", workDir + File.separator + this.getClass().getSimpleName() + + File.separator + "mapred" + File.separator + "system"); + hiveConf.set("mapreduce.jobtracker.staging.root.dir", workDir + File.separator + this.getClass().getSimpleName() + + File.separator + "mapred" + File.separator + "staging"); + hiveConf.set("mapred.temp.dir", workDir + File.separator + this.getClass().getSimpleName() + + File.separator + "mapred" + File.separator + "temp"); + hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, getWarehouseDir()); + hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName()); + hiveConf + .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, + "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"); + hiveConf.setBoolVar(HiveConf.ConfVars.MERGE_CARDINALITY_VIOLATION_CHECK, true); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false); + TxnDbUtil.setConfValues(hiveConf); + TxnDbUtil.prepDb(); + File f = new File(getWarehouseDir()); + if (f.exists()) { + FileUtil.fullyDelete(f); + } + if (!(new File(getWarehouseDir()).mkdirs())) { + throw new RuntimeException("Could not create " + getWarehouseDir()); + } + SessionState ss = SessionState.start(hiveConf); + ss.applyAuthorizationPolicy(); + d = new Driver(new QueryState(hiveConf), null); + d.setMaxRows(10000); +// dropTables(); + } + String getWarehouseDir() { + return getTestDataDir() + "/warehouse"; + } +}