diff --git pom.xml pom.xml
index 1f43c416db..d19398f651 100644
--- pom.xml
+++ pom.xml
@@ -62,6 +62,7 @@
testutils
packaging
standalone-metastore
+ upgrade-acid
diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/HiveMetaTool.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/HiveMetaTool.java
index 875eba3c67..283f945dc5 100644
--- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/HiveMetaTool.java
+++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/HiveMetaTool.java
@@ -410,7 +410,7 @@ private void prepareAcidUpgrade(String scriptLocation) {
/**
* todo: make sure compaction queue is configured and has ample capacity
* todo: what to do on failure? Suppose some table/part is not readable. should it produce
- * todo: should probably suppor dryRun mode where we output scripts but instead of renaming files
+ * todo: should probably support dryRun mode where we output scripts but instead of renaming files
* we generate a renaming script. Alternatively, always generate a renaming script and have
* user execute it - this is probably a better option. If script is not empty on rerun someone
* added files to table to be made Acid.
@@ -427,6 +427,37 @@ private void prepareAcidUpgrade(String scriptLocation) {
* todo: this should accept a file of table names to exclude from non-acid to acid conversion
* todo: change script comments to a preamble instead of a footer
*
+ *
+ * How to make this blocking? For File renames and converting to Acid/MM we can do it synchronously
+ * for Compactions we have to enqueue all of them and then wait to complete.
+ * Ideally we'd just wait for the compaction Ids that we started to complete - which is what ready for cleaning or successful
+ * BTW, what if it fails? Do we resumbit?
+ * What if for given tbl/part there is prior failure? Can we tell the difference?
+ *
+ * DDLTaks.compact() writes to Console a msg with ID - this way we know what we enqueued - if we can parse it out
+ * Hive.compact2() returns CompactionResponse which has the ID - maybe we can schedule compacts manually
+ * Hive.get(conf) to get Hive
+ *
+ * can we instead do show compactions, and get highest ID before start and then wait for those above to finish?
+ *
+ * start with basic - do show compacts, get min ID, submit all, get max id, make sure all in between have completed
+ *
+ *
+ *
+ * Embedded HS2 is just JDBC with appropriate URL
+ * https://cwiki.apache.org/confluence/display/Hive/HiveServer2+Clients#HiveServer2Clients-ConnectionURLforRemoteorEmbeddedMode
+ * Submitting compaction commands via jdbc will definitely not return any compaction IDs
+ *
+ * Thejas said that Hive.get(conf) should be fine as long as the right hive-site.xml is on the class path
+ * and tool is run as 'hive' user - this way it will be able to get to MySql
+ *
+ * This should output scripts in all cases to provide a record of what it's doing
+ * For blocking compaction submission, use Hive.compact2() directly to get compaction ID and wait for it
+ * This will happen with 2.6 Hive binaries on the CP
+ * how does rename script work? "hadoop fs -mv oldname newname" * and what what about S3?
+ * How does this actually get executed?
+ * all other actions are done via embedded JDBC
+ *
* @throws MetaException
* @throws TException
*/
@@ -448,7 +479,7 @@ private void prepareAcidUpgradeInternal(String scriptLocation) throws MetaExcept
for(String tableName : tables) {
Table t = hms.getTable(dbName, tableName);
- //ql depends on metastore and is not accessible here... and if it was, I would not be using
+ //ql depends on metastore and is not accessible here... and if it was, it would not be using
//2.6 exec jar, but 3.0.... which is not what we want
List compactionCommands = getCompactionCommands(t, conf, hms, compactionMetaInfo);
compactions.addAll(compactionCommands);
@@ -705,6 +736,7 @@ public boolean accept(Path path) {
if(needsCompaction(bucket, fs)) {
//found delete events - this 'location' needs compacting
compactionMetaInfo.numberOfBytes += getDataSize(location, conf);
+ //todo: this is not remotely accurate if you have many (relevant) original files
return true;
}
}
@@ -738,6 +770,7 @@ private static boolean needsCompaction(FileStatus bucket, FileSystem fs) throws
}
return as.deletes > 0 || as.updates > 0;
}
+ //todo: add option to submit directly and a Set to add compaction id
private static String getCompactionCommand(Table t, Partition p) {
StringBuilder sb = new StringBuilder("ALTER TABLE ").append(Warehouse.getQualifiedName(t));
if(t.getPartitionKeysSize() > 0) {
@@ -745,7 +778,7 @@ private static String getCompactionCommand(Table t, Partition p) {
Warehouse.getQualifiedName(t);
sb.append(" PARTITION(");
for (int i = 0; i < t.getPartitionKeysSize(); i++) {
- //todo: should these be quoted? HiveUtils.unparseIdentifier() - if value is String should definitely quote
+ //todo: should these be quoted? HiveUtils.unparseIdentifier() - if value is String should definitely quote - check SemanticAnalyzer
sb.append(t.getPartitionKeys().get(i).getName()).append('=')
.append(p.getValues().get(i)).append(",");
}
diff --git upgrade-acid/pom.xml upgrade-acid/pom.xml
new file mode 100644
index 0000000000..02f02a407a
--- /dev/null
+++ upgrade-acid/pom.xml
@@ -0,0 +1,467 @@
+
+
+
+
+
+ org.apache
+ apache
+
+ 18
+
+
+ 4.0.0
+
+ org.apache.hive
+ 4.0.0-SNAPSHOT
+ upgrade-acid
+ Hive Upgrade Acid
+ jar
+
+
+
+ UTF-8
+ 1.8
+ 1.8
+ false
+ ${settings.localRepository}
+ 2.3
+ 1.6.0
+ ..
+
+ 1.0b3
+ 1.7
+ ${basedir}/checkstyle/
+ 2.17
+ 2.20.1
+
+
+ ${project.build.directory}/testconf
+ file://
+ ${project.basedir}/src/test/resources
+ ${project.build.directory}/tmp
+ ${project.build.directory}/warehouse
+ file://
+ 1
+ true
+
+
+
+ commons-cli
+ commons-cli
+
+ 1.2
+
+
+ org.apache.hive
+ hive-metastore
+ 2.3.3
+
+
+ org.apache.hive
+ hive-exec
+ 2.3.3
+
+
+ org.apache.hadoop
+ hadoop-common
+ 2.7.2
+
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-common
+ 2.7.2
+
+
+
+ org.apache.orc
+ orc-core
+ 1.3.3
+
+
+
+
+
+
+
+
+ ${basedir}/src/main/resources
+
+ package.jdo
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-antrun-plugin
+ ${maven.antrun.plugin.version}
+
+
+ ant-contrib
+ ant-contrib
+ ${ant.contrib.version}
+
+
+ ant
+ ant
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-checkstyle-plugin
+ ${maven.checkstyle.plugin.version}
+
+
+ org.codehaus.mojo
+ exec-maven-plugin
+ ${maven.exec.plugin.version}
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-antrun-plugin
+
+
+ setup-test-dirs
+ process-test-resources
+
+ run
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ setup-metastore-scripts
+ process-test-resources
+
+ run
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-failsafe-plugin
+ 2.20.1
+
+
+
+ integration-test
+ verify
+
+
+
+
+ true
+ false
+ -Xmx2048m
+ false
+
+ true
+ ${test.tmp.dir}
+ ${test.tmp.dir}
+ true
+
+
+ ${log4j.conf.dir}
+
+ ${skipITests}
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+ ${maven.surefire.version}
+
+ true
+ false
+ ${test.forkcount}
+ -Xmx2048m
+ false
+
+ ${project.build.directory}
+ true
+ ${derby.version}
+ ${test.tmp.dir}/derby.log
+
+ ${test.log4j.scheme}${test.conf.dir}/hive-log4j2.properties
+ true
+ ${test.tmp.dir}
+
+ jdbc:derby:${test.tmp.dir}/junit_metastore_db;create=true
+ false
+ ${test.tmp.dir}
+ ${test.warehouse.scheme}${test.warehouse.dir}
+
+
+
+ ${log4j.conf.dir}
+ ${test.conf.dir}
+
+ ${test.conf.dir}/conf
+
+
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+
+ test-jar
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git upgrade-acid/src/main/java/org/apache/hadoop/hive/upgrade/acid/UpgradeTool.java upgrade-acid/src/main/java/org/apache/hadoop/hive/upgrade/acid/UpgradeTool.java
new file mode 100644
index 0000000000..651beaef1c
--- /dev/null
+++ upgrade-acid/src/main/java/org/apache/hadoop/hive/upgrade/acid/UpgradeTool.java
@@ -0,0 +1,457 @@
+package org.apache.hadoop.hive.upgrade.acid;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.impl.AcidStats;
+import org.apache.orc.impl.OrcAcidUtils;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Need pre-upgrade, post-upgrade modes since we don't want to run compaction after upgrade for example
+ * support dry-run mode - this just generates all the scripts otherwise it executes them
+ */
+public class UpgradeTool {
+ private static final Logger LOG = LoggerFactory.getLogger(UpgradeTool.class);
+ private final Options cmdLineOptions = new Options();
+
+ public static void main(String[] args) throws Exception {
+ CommandLineParser parser = new GnuParser();
+ CommandLine line = null;
+ UpgradeTool tool = new UpgradeTool();
+ try {
+ tool.prepareAcidUpgradeInternal(".");
+ }
+ catch(Exception ex) {
+ LOG.error("UpgradeTool failed", ex);
+ throw ex;
+ }
+ }
+ private static class CompactionMetaInfo {
+ /**
+ * total number of bytes to be compacted across all compaction commands
+ */
+ long numberOfBytes;
+ //BlockStoragePolicySpi f;//todo: this is only in hadoop 3.0
+ }
+ /**
+ * todo: make sure compaction queue is configured and has ample capacity
+ * todo: what to do on failure? Suppose some table/part is not readable. should it produce
+ * todo: should probably support dryRun mode where we output scripts but instead of renaming files
+ * we generate a renaming script. Alternatively, always generate a renaming script and have
+ * user execute it - this is probably a better option. If script is not empty on rerun someone
+ * added files to table to be made Acid.
+ * commands for all other tables?
+ * todo: how do we test this? even if we had 2.x data it won't be readable in 3.0. even w/o any
+ * updates, txnids in the data won't make sense in 3.0 w/o actually performing equivalent of
+ * metastore upgrade to init writeid table. Also, can we even create a new table and set location
+ * to existing files to simulate a 2.x table?
+ * todo: generate some instructions in compaction script to include tottal compactions to perform,
+ * total data volume to handle and anything else that may help users guess at how long it will
+ * take. Also, highlight tuning options to speed this up.
+ * todo: can we make the script blocking so that it waits for cleaner to delete files?
+ * need to poll SHOW COMPACTIONS and make sure that all partitions are in "finished" state
+ * todo: this should accept a file of table names to exclude from non-acid to acid conversion
+ * todo: change script comments to a preamble instead of a footer
+ *
+ *
+ * How to make this blocking? For File renames and converting to Acid/MM we can do it synchronously
+ * for Compactions we have to enqueue all of them and then wait to complete.
+ * Ideally we'd just wait for the compaction Ids that we started to complete - which is what ready for cleaning or successful
+ * BTW, what if it fails? Do we resumbit?
+ * What if for given tbl/part there is prior failure? Can we tell the difference?
+ *
+ * DDLTaks.compact() writes to Console a msg with ID - this way we know what we enqueued - if we can parse it out
+ * Hive.compact2() returns CompactionResponse which has the ID - maybe we can schedule compacts manually
+ * Hive.get(conf) to get Hive
+ *
+ * can we instead do show compactions, and get highest ID before start and then wait for those above to finish?
+ *
+ * start with basic - do show compacts, get min ID, submit all, get max id, make sure all in between have completed
+ *
+ *
+ *
+ * Embedded HS2 is just JDBC with appropriate URL
+ * https://cwiki.apache.org/confluence/display/Hive/HiveServer2+Clients#HiveServer2Clients-ConnectionURLforRemoteorEmbeddedMode
+ * Submitting compaction commands via jdbc will definitely not return any compaction IDs
+ *
+ * Thejas said that Hive.get(conf) should be fine as long as the right hive-site.xml is on the class path
+ * and tool is run as 'hive' user - this way it will be able to get to MySql
+ *
+ * This should output scripts in all cases to provide a record of what it's doing
+ * For blocking compaction submission, use Hive.compact2() directly to get compaction ID and wait for it
+ * This will happen with 2.6 Hive binaries on the CP
+ * how does rename script work? "hadoop fs -mv oldname newname" * and what what about S3?
+ * How does this actually get executed?
+ * all other actions are done via embedded JDBC
+ *
+ * @throws MetaException
+ * @throws TException
+ */
+ private void prepareAcidUpgradeInternal(String scriptLocation) throws MetaException, TException, IOException {
+ //Configuration conf = MetastoreConf.newMetastoreConf();
+ HiveConf conf = new HiveConf();
+ System.out.println("Looking for Acid tables that need to be compacted");
+ //todo: check if acid is enabled and bail if not
+ //todo: check that running on 2.x?
+ HiveMetaStoreClient hms = new HiveMetaStoreClient(conf);//MetaException
+ List databases = hms.getAllDatabases();//TException
+ System.out.println("Found " + databases.size() + " databases to process");
+ List compactions = new ArrayList<>();
+ List convertToAcid = new ArrayList<>();
+ List convertToMM = new ArrayList<>();
+ final CompactionMetaInfo compactionMetaInfo = new CompactionMetaInfo();
+ for(String dbName : databases) {
+ List tables = hms.getAllTables(dbName);
+ System.out.println("found " + tables.size() + " tables in " + dbName);
+ for(String tableName : tables) {
+ Table t = hms.getTable(dbName, tableName);
+
+ //ql depends on metastore and is not accessible here... and if it was, it would not be using
+ //2.6 exec jar, but 3.0.... which is not what we want
+ List compactionCommands = getCompactionCommands(t, conf, hms, compactionMetaInfo);
+ compactions.addAll(compactionCommands);
+ processConversion(t, convertToAcid, convertToMM, hms);
+ /*todo: handle renaming files somewhere
+ * */
+ }
+ }
+ makeCompactionScript(compactions, scriptLocation, compactionMetaInfo);
+ makeConvertTableScript(convertToAcid, convertToMM, scriptLocation);
+ makeRenameFileScript(scriptLocation);
+ }
+ //todo: handle exclusion list
+ private static void processConversion(Table t, List convertToAcid,
+ List convertToMM, HiveMetaStoreClient hms) throws TException {
+ if(isFullAcidTable(t)) {
+ return;
+ }
+ if(!TableType.MANAGED_TABLE.name().equalsIgnoreCase(t.getTableType())) {
+ return;
+ }
+ String fullTableName = Warehouse.getQualifiedName(t);
+ if(t.getPartitionKeysSize() <= 0) {
+ if(canBeMadeAcid(fullTableName, t.getSd())) {
+ convertToAcid.add("ALTER TABLE " + Warehouse.getQualifiedName(t) + " SET TBLPROPERTIES (" +
+ "'transactional'='true')");
+ }
+ else {
+ convertToMM.add("ALTER TABLE " + Warehouse.getQualifiedName(t) + " SET TBLPROPERTIES (" +
+ "'transactional'='true', 'transactional_properties'='insert_only')");
+ }
+ }
+ else {
+ List partNames = hms.listPartitionNames(t.getDbName(), t.getTableName(), (short)-1);
+ int batchSize = 10000;//todo: right size?
+ int numWholeBatches = partNames.size()/batchSize;
+ for(int i = 0; i < numWholeBatches; i++) {
+ List partitionList = hms.getPartitionsByNames(t.getDbName(), t.getTableName(),
+ partNames.subList(i * batchSize, (i + 1) * batchSize));
+ for(Partition p : partitionList) {
+ if(!canBeMadeAcid(fullTableName, p.getSd())) {
+ convertToMM.add("ALTER TABLE " + Warehouse.getQualifiedName(t) + " SET TBLPROPERTIES (" +
+ "'transactional'='true', 'transactional_properties'='insert_only')");
+ return;
+ }
+ }
+ }
+ if(numWholeBatches * batchSize < partNames.size()) {
+ //last partial batch
+ List partitionList = hms.getPartitionsByNames(t.getDbName(), t.getTableName(),
+ partNames.subList(numWholeBatches * batchSize, partNames.size()));
+ for (Partition p : partitionList) {
+ if (!canBeMadeAcid(fullTableName, p.getSd())) {
+ convertToMM.add("ALTER TABLE " + Warehouse.getQualifiedName(t) + " SET TBLPROPERTIES (" +
+ "'transactional'='true', 'transactional_properties'='insert_only')");
+ return;
+ }
+ }
+ }
+ //if here checked all parts and they are Acid compatible - make it acid
+ convertToAcid.add("ALTER TABLE " + Warehouse.getQualifiedName(t) + " SET TBLPROPERTIES (" +
+ "'transactional'='true')");
+ }
+ }
+ private static boolean canBeMadeAcid(String fullTableName, StorageDescriptor sd) {
+ return isAcidInputOutputFormat(fullTableName, sd) && sd.getSortColsSize() <= 0;
+ }
+ private static boolean isAcidInputOutputFormat(String fullTableName, StorageDescriptor sd) {
+ try {
+ Class inputFormatClass = sd.getInputFormat() == null ? null :
+ Class.forName(sd.getInputFormat());
+ Class outputFormatClass = sd.getOutputFormat() == null ? null :
+ Class.forName(sd.getOutputFormat());
+
+ if (inputFormatClass != null && outputFormatClass != null &&
+ Class.forName("org.apache.hadoop.hive.ql.io.AcidInputFormat")
+ .isAssignableFrom(inputFormatClass) &&
+ Class.forName("org.apache.hadoop.hive.ql.io.AcidOutputFormat")
+ .isAssignableFrom(outputFormatClass)) {
+ return true;
+ }
+ } catch (ClassNotFoundException e) {
+ //if a table is using some custom I/O format and it's not in the classpath, we won't mark
+ //the table for Acid, but today (Hive 3.1 and earlier) OrcInput/OutputFormat is the only
+ //Acid format
+ System.err.println("Could not determine if " + fullTableName +
+ " can be made Acid due to: " + e.getMessage());
+ return false;
+ }
+ return false;
+ }
+ /**
+ * currently writes to current dir (whatever that is).
+ * If there is nothing to compact, outputs empty file so as not to confuse the output with a
+ * failed run.
+ * todo: add some config to tell it where to put the script
+ */
+ private static void makeCompactionScript(List commands, String scriptLocation,
+ CompactionMetaInfo compactionMetaInfo) throws IOException {
+ if (commands.isEmpty()) {
+ System.out.println("No compaction is necessary");
+ return;
+ }
+ String fileName = "compacts_" + System.currentTimeMillis() + ".sql";
+ System.out.println("Writing compaction commands to " + fileName);
+ try(PrintWriter pw = createScript(commands, fileName, scriptLocation)) {
+ //add post script
+ pw.println("-- Generated total of " + commands.size() + " compaction commands");
+ if(compactionMetaInfo.numberOfBytes < Math.pow(2, 20)) {
+ //to see it working in UTs
+ pw.println("-- The total volume of data to be compacted is " +
+ String.format("%.6fMB", compactionMetaInfo.numberOfBytes/Math.pow(2, 20)));
+ }
+ else {
+ pw.println("-- The total volume of data to be compacted is " +
+ String.format("%.3fGB", compactionMetaInfo.numberOfBytes/Math.pow(2, 30)));
+ }
+ pw.println();
+ pw.println(
+ "-- Please note that compaction may be a heavyweight and time consuming process.\n" +
+ "-- Submitting all of these commands will enqueue them to a scheduling queue from\n" +
+ "-- which they will be picked up by compactor Workers. The max number of\n" +
+ "-- concurrent Workers is controlled by hive.compactor.worker.threads configured\n" +
+ "-- for the standalone metastore process. Compaction itself is a Map-Reduce job\n" +
+ "-- which is submitted to the YARN queue identified by hive.compactor.job.queue\n" +
+ "-- property if defined or 'default' if not defined. It's advisable to set the\n" +
+ "-- capacity of this queue appropriately");
+ }
+ }
+ private static void makeConvertTableScript(List alterTableAcid, List alterTableMm,
+ String scriptLocation) throws IOException {
+ if (alterTableAcid.isEmpty()) {
+ System.out.println("No acid conversion is necessary");
+ } else {
+ String fileName = "convertToAcid_" + System.currentTimeMillis() + ".sql";
+ System.out.println("Writing acid conversion commands to " + fileName);
+ try(PrintWriter pw = createScript(alterTableAcid, fileName, scriptLocation)) {
+ //todo: fix this - it has to run in 3.0 since tables may be unbucketed
+ pw.println("-- These commands may be executed by Hive 1.x later");
+ }
+ }
+
+ if (alterTableMm.isEmpty()) {
+ System.out.println("No managed table conversion is necessary");
+ } else {
+ String fileName = "convertToMM_" + System.currentTimeMillis() + ".sql";
+ System.out.println("Writing managed table conversion commands to " + fileName);
+ try(PrintWriter pw = createScript(alterTableMm, fileName, scriptLocation)) {
+ pw.println("-- These commands must be executed by Hive 3.0 or later");
+ }
+ }
+ }
+
+ private static PrintWriter createScript(List commands, String fileName,
+ String scriptLocation) throws IOException {
+ //todo: make sure to create the file in 'scriptLocation' dir
+ FileWriter fw = new FileWriter(scriptLocation + "/" + fileName);
+ PrintWriter pw = new PrintWriter(fw);
+ for(String cmd : commands) {
+ pw.println(cmd + ";");
+ }
+ return pw;
+ }
+ private static void makeRenameFileScript(String scriptLocation) throws IOException {
+ List commands = Collections.emptyList();
+ if (commands.isEmpty()) {
+ System.out.println("No file renaming is necessary");
+ } else {
+ String fileName = "normalizeFileNames_" + System.currentTimeMillis() + ".sh";
+ System.out.println("Writing file renaming commands to " + fileName);
+ PrintWriter pw = createScript(commands, fileName, scriptLocation);
+ pw.close();
+ }
+ }
+ /**
+ * @return any compaction commands to run for {@code Table t}
+ */
+ private static List getCompactionCommands(Table t, HiveConf conf,
+ HiveMetaStoreClient hms, CompactionMetaInfo compactionMetaInfo)
+ throws IOException, TException {
+ if(!isFullAcidTable(t)) {
+ return Collections.emptyList();
+ }
+ if(t.getPartitionKeysSize() <= 0) {
+ //not partitioned
+ if(!needsCompaction(new Path(t.getSd().getLocation()), conf, compactionMetaInfo)) {
+ return Collections.emptyList();
+ }
+
+ List cmds = new ArrayList<>();
+ cmds.add(getCompactionCommand(t, null));
+ return cmds;
+ }
+ List partNames = hms.listPartitionNames(t.getDbName(), t.getTableName(), (short)-1);
+ int batchSize = 10000;//todo: right size?
+ int numWholeBatches = partNames.size()/batchSize;
+ List compactionCommands = new ArrayList<>();
+ for(int i = 0; i < numWholeBatches; i++) {
+ List partitionList = hms.getPartitionsByNames(t.getDbName(), t.getTableName(), partNames.subList(i * batchSize, (i + 1) * batchSize));
+ for(Partition p : partitionList) {
+ if(needsCompaction(new Path(p.getSd().getLocation()), conf, compactionMetaInfo)) {
+ compactionCommands.add(getCompactionCommand(t, p));
+ }
+ }
+ }
+ if(numWholeBatches * batchSize < partNames.size()) {
+ //last partial batch
+ List partitionList = hms.getPartitionsByNames(t.getDbName(), t.getTableName(), partNames.subList(numWholeBatches * batchSize, partNames.size()));
+ for (Partition p : partitionList) {
+ if (needsCompaction(new Path(p.getSd().getLocation()), conf, compactionMetaInfo)) {
+ compactionCommands.add(getCompactionCommand(t, p));
+ }
+ }
+ }
+ return compactionCommands;
+ }
+ /**
+ *
+ * @param location - path to a partition (or table if not partitioned) dir
+ */
+ private static boolean needsCompaction(Path location, HiveConf conf,
+ CompactionMetaInfo compactionMetaInfo) throws IOException {
+ FileSystem fs = location.getFileSystem(conf);
+ FileStatus[] deltas = fs.listStatus(location, new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ //checking for delete_delta is only so that this functionality can be exercised by code 3.0
+ //which cannot produce any deltas with mix of update/insert events
+ return path.getName().startsWith("delta_") || path.getName().startsWith("delete_delta_");
+ }
+ });
+ if(deltas == null || deltas.length == 0) {
+ //base_n cannot contain update/delete. Original files are all 'insert' and we need to compact
+ //only if there are update/delete events.
+ return false;
+ }
+ deltaLoop: for(FileStatus delta : deltas) {
+ if(!delta.isDirectory()) {
+ //should never happen - just in case
+ continue;
+ }
+ FileStatus[] buckets = fs.listStatus(delta.getPath(), new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ //since this is inside a delta dir created by Hive 2.x or earlier it can only contain
+ //bucket_x or bucket_x__flush_length
+ return path.getName().startsWith("bucket_");
+ }
+ });
+ for(FileStatus bucket : buckets) {
+ if(bucket.getPath().getName().endsWith("_flush_length")) {
+ //streaming ingest dir - cannot have update/delete events
+ continue deltaLoop;
+ }
+ if(needsCompaction(bucket, fs)) {
+ //found delete events - this 'location' needs compacting
+ compactionMetaInfo.numberOfBytes += getDataSize(location, conf);
+ //todo: this is not remotely accurate if you have many (relevant) original files
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ /**
+ * @param location - path to a partition (or table if not partitioned) dir
+ * @throws IOException
+ */
+ private static long getDataSize(Path location, HiveConf conf) throws IOException {
+ /*
+ * todo: Figure out the size of the partition. The
+ * best way is to getAcidState() and look at each file - this way it takes care of
+ * original files vs base and any other obsolete files. For now just brute force it,
+ * it's likely close enough for a rough estimate.*/
+ FileSystem fs = location.getFileSystem(conf);
+ ContentSummary cs = fs.getContentSummary(location);
+ return cs.getLength();
+ }
+ private static boolean needsCompaction(FileStatus bucket, FileSystem fs) throws IOException {
+ //create reader, look at footer
+ //no need to check side file since it can only be in a streaming ingest delta
+ Reader orcReader = OrcFile.createReader(bucket.getPath(),OrcFile.readerOptions(fs.getConf())
+ .filesystem(fs));
+ AcidStats as = OrcAcidUtils.parseAcidStats(orcReader);
+ if(as == null) {
+ //should never happen since we are reading bucket_x written by acid write
+ throw new IllegalStateException("AcidStats missing in " + bucket.getPath());
+ }
+ return as.deletes > 0 || as.updates > 0;
+ }
+ //todo: add option to submit directly and a Set to add compaction id
+ private static String getCompactionCommand(Table t, Partition p) {
+ StringBuilder sb = new StringBuilder("ALTER TABLE ").append(Warehouse.getQualifiedName(t));
+ if(t.getPartitionKeysSize() > 0) {
+ assert p != null : "must supply partition for partitioned table " +
+ Warehouse.getQualifiedName(t);
+ sb.append(" PARTITION(");
+ for (int i = 0; i < t.getPartitionKeysSize(); i++) {
+ //todo: should these be quoted? HiveUtils.unparseIdentifier() - if value is String should definitely quote - check SemanticAnalyzer
+ sb.append(t.getPartitionKeys().get(i).getName()).append('=')
+ .append(p.getValues().get(i)).append(",");
+ }
+ sb.setCharAt(sb.length() - 1, ')');//replace trailing ','
+ }
+ return sb.append(" COMPACT 'major'").toString();
+ }
+ private static boolean isFullAcidTable(Table t) {
+ if (t.getParametersSize() <= 0) {
+ //cannot be acid
+ return false;
+ }
+ String transacationalValue = t.getParameters()
+ .get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
+ if (transacationalValue != null && "true".equalsIgnoreCase(transacationalValue)) {
+ System.out.println("Found Acid table: " + Warehouse.getQualifiedName(t));
+ return true;
+ }
+ return false;
+ }
+}
diff --git upgrade-acid/src/test/java/org/apache/hadoop/hive/upgrade/acid/TestUpgradeTool.java upgrade-acid/src/test/java/org/apache/hadoop/hive/upgrade/acid/TestUpgradeTool.java
new file mode 100644
index 0000000000..6511ac4dba
--- /dev/null
+++ upgrade-acid/src/test/java/org/apache/hadoop/hive/upgrade/acid/TestUpgradeTool.java
@@ -0,0 +1,153 @@
+package org.apache.hadoop.hive.upgrade.acid;
+
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TestUpgradeTool {
+ private static final Logger LOG = LoggerFactory.getLogger(TestUpgradeTool.class);
+ private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") +
+ File.separator + TestUpgradeTool.class.getCanonicalName() + "-" + System.currentTimeMillis()
+ ).getPath().replaceAll("\\\\", "/");
+
+ String getTestDataDir() {
+ return TEST_DATA_DIR;
+ }
+ @Test
+ public void testUpgrade() throws Exception {
+ MapRedTask t = null;
+ int[][] data = {{1,2}, {3, 4}, {5, 6}};
+ int[][] dataPart = {{1, 2, 10}, {3, 4, 11}, {5, 6, 12}};
+ runStatementOnDriver("drop table if exists TAcid");
+ runStatementOnDriver("drop table if exists TAcidPart");
+ runStatementOnDriver("drop table if exists TFlat");
+ runStatementOnDriver("drop table if exists TFlatText");
+// runStatementOnDriver("create table " + Table.ACIDTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+
+ runStatementOnDriver("create table TAcid (a int, b int) clustered by (b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ runStatementOnDriver("create table TAcidPart (a int, b int) partitioned by (p int) clustered by (b) into 2 buckets stored" +
+ " as orc TBLPROPERTIES ('transactional'='true')");
+ runStatementOnDriver("create table TFlat (a int, b int) stored as orc tblproperties('transactional'='false')");
+ runStatementOnDriver("create table TFlatText (a int, b int) stored as textfile tblproperties('transactional'='false')");
+
+
+ //this needs major compaction
+ runStatementOnDriver("insert into TAcid" + makeValuesClause(data));
+ runStatementOnDriver("update TAcid set a = 1 where b = 2");
+
+ //this table needs to be converted to Acid
+ runStatementOnDriver("insert into TFlat" + makeValuesClause(data));
+
+ //this table needs to be converted to MM
+ runStatementOnDriver("insert into TFlatText" + makeValuesClause(data));
+
+ //p=10 needs major compaction
+ runStatementOnDriver("insert into TAcidPart partition(p)" + makeValuesClause(dataPart));
+ runStatementOnDriver("update TAcidPart set a = 1 where b = 2 and p = 10");
+
+ //todo: add partitioned table that needs conversion to MM/Acid
+
+ //todo: rename files case
+ String[] args = new String[1];
+ args[0] = new String("-prepareAcidUpgrade");
+// HiveMetaTool.main(args);
+ UpgradeTool.main(args);
+ }
+ static String makeValuesClause(int[][] rows) {
+ assert rows.length > 0;
+ StringBuilder sb = new StringBuilder(" values");
+ for(int[] row : rows) {
+ assert row.length > 0;
+ if(row.length > 1) {
+ sb.append("(");
+ }
+ for(int value : row) {
+ sb.append(value).append(",");
+ }
+ sb.setLength(sb.length() - 1);//remove trailing comma
+ if(row.length > 1) {
+ sb.append(")");
+ }
+ sb.append(",");
+ }
+ sb.setLength(sb.length() - 1);//remove trailing comma
+ return sb.toString();
+ }
+
+ List runStatementOnDriver(String stmt) throws Exception {
+ CommandProcessorResponse cpr = d.run(stmt);
+ if(cpr.getResponseCode() != 0) {
+ throw new RuntimeException(stmt + " failed: " + cpr);
+ }
+ List rs = new ArrayList();
+ d.getResults(rs);
+ return rs;
+ }
+ @Before
+ public void setUp() throws Exception {
+ setUpInternal();
+ }
+ void initHiveConf() {
+ hiveConf = new HiveConf(this.getClass());
+ }
+ @Rule
+ public TestName testName = new TestName();
+ HiveConf hiveConf;
+ Driver d;
+ void setUpInternal() throws Exception {
+ initHiveConf();
+ Path workDir = new Path(System.getProperty("test.tmp.dir",
+ "target" + File.separator + "test" + File.separator + "tmp"));
+ hiveConf.set("mapred.local.dir", workDir + File.separator + this.getClass().getSimpleName()
+ + File.separator + "mapred" + File.separator + "local");
+ hiveConf.set("mapred.system.dir", workDir + File.separator + this.getClass().getSimpleName()
+ + File.separator + "mapred" + File.separator + "system");
+ hiveConf.set("mapreduce.jobtracker.staging.root.dir", workDir + File.separator + this.getClass().getSimpleName()
+ + File.separator + "mapred" + File.separator + "staging");
+ hiveConf.set("mapred.temp.dir", workDir + File.separator + this.getClass().getSimpleName()
+ + File.separator + "mapred" + File.separator + "temp");
+ hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, getWarehouseDir());
+ hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName());
+ hiveConf
+ .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
+ "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
+ hiveConf.setBoolVar(HiveConf.ConfVars.MERGE_CARDINALITY_VIOLATION_CHECK, true);
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false);
+ TxnDbUtil.setConfValues(hiveConf);
+ TxnDbUtil.prepDb();
+ File f = new File(getWarehouseDir());
+ if (f.exists()) {
+ FileUtil.fullyDelete(f);
+ }
+ if (!(new File(getWarehouseDir()).mkdirs())) {
+ throw new RuntimeException("Could not create " + getWarehouseDir());
+ }
+ SessionState ss = SessionState.start(hiveConf);
+ ss.applyAuthorizationPolicy();
+ d = new Driver(new QueryState(hiveConf), null);
+ d.setMaxRows(10000);
+// dropTables();
+ }
+ String getWarehouseDir() {
+ return getTestDataDir() + "/warehouse";
+ }
+}