diff --git pom.xml pom.xml
index 4fb83c9df0..4e6feefc4a 100644
--- pom.xml
+++ pom.xml
@@ -106,7 +106,7 @@
2.4
2.4
3.1.0
- 2.20.1
+ 2.21.0
2.4
2.8
2.9
diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java
index 0e53697be2..0576c02a8b 100644
--- ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java
+++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java
@@ -19,6 +19,7 @@
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.tools.HiveMetaTool;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
@@ -535,4 +536,39 @@ public void testMMExportAborted() throws Exception {
TestTxnCommands2.stringifyValues(data), rs);
}
+ @Test
+ public void testMetaTool() throws Exception {
+ int[][] data = {{1,2}, {3, 4}, {5, 6}};
+ int[][] dataPart = {{1, 2, 10}, {3, 4, 11}, {5, 6, 12}};
+ runStatementOnDriver("drop table if exists TAcid");
+ runStatementOnDriver("drop table if exists TAcidPart");
+ runStatementOnDriver("drop table if exists TFlat");
+ runStatementOnDriver("drop table if exists TFlatText");
+ runStatementOnDriver("create table TAcid (a int, b int) stored as orc");
+ runStatementOnDriver("create table TAcidPart (a int, b int) partitioned by (p int) stored" +
+ " as orc");
+ runStatementOnDriver("create table TFlat (a int, b int) stored as orc tblproperties('transactional'='false')");
+ runStatementOnDriver("create table TFlatText (a int, b int) stored as textfile tblproperties('transactional'='false')");
+
+
+ //this needs major compaction
+ runStatementOnDriver("insert into TAcid" + TestTxnCommands2.makeValuesClause(data));
+ runStatementOnDriver("update TAcid set a = 1 where b = 2");
+
+ //this table needs to be converted to Acid
+ runStatementOnDriver("insert into TFlat" + TestTxnCommands2.makeValuesClause(data));
+
+ //this table needs to be converted to MM
+ runStatementOnDriver("insert into TFlatText" + TestTxnCommands2.makeValuesClause(data));
+
+ //p=10 needs major compaction
+ runStatementOnDriver("insert into TAcidPart" + TestTxnCommands2.makeValuesClause(dataPart));
+ runStatementOnDriver("update TAcidPart set a = 1 where b = 2 and p = 10");
+
+
+ //todo: rename files case
+ String[] args = new String[1];
+ args[0] = new String("-prepareAcidUpgrade");
+ HiveMetaTool.main(args);
+ }
}
diff --git standalone-metastore/pom.xml standalone-metastore/pom.xml
index c340fe2d24..10b1bfadf3 100644
--- standalone-metastore/pom.xml
+++ standalone-metastore/pom.xml
@@ -80,6 +80,7 @@
0.9.3
2.8.2
1.10.19
+ 1.4.3
2.5.0
1.3.0
2.6.0-SNAPSHOT
@@ -92,6 +93,21 @@
+
+ org.apache.orc
+ orc-core
+ ${orc.version}
+
+
+ org.apache.hadoop
+ hadoop-common
+
+
+ org.apache.hive
+ hive-storage-api
+
+
+
com.fasterxml.jackson.core
jackson-databind
diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/HiveMetaTool.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/HiveMetaTool.java
index f4eacd5fb9..6d1c6736f2 100644
--- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/HiveMetaTool.java
+++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/HiveMetaTool.java
@@ -18,8 +18,13 @@
package org.apache.hadoop.hive.metastore.tools;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
import java.net.URI;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -34,7 +39,22 @@
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.utils.StringUtils;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.impl.AcidStats;
+import org.apache.orc.impl.OrcAcidUtils;
+import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.Path;
@@ -104,6 +124,12 @@ private void init() {
.withDescription("Specify the key for table property to be updated. tablePropKey option " +
"is valid only with updateLocation option.")
.create("tablePropKey");
+ Option prepareAcidUpgrade =
+ OptionBuilder.withArgName("find-compactions")
+ .hasOptionalArg() //directory to output results to
+ .withDescription("Generates a set Compaction commands to run to prepare for Hive 2.x" +
+ " to 3.0 upgrade")
+ .create("prepareAcidUpgrade");
cmdLineOptions.addOption(help);
cmdLineOptions.addOption(listFSRoot);
@@ -112,6 +138,7 @@ private void init() {
cmdLineOptions.addOption(dryRun);
cmdLineOptions.addOption(serdePropKey);
cmdLineOptions.addOption(tablePropKey);
+ cmdLineOptions.addOption(prepareAcidUpgrade);
}
private void initObjectStore(Configuration conf) {
@@ -362,7 +389,230 @@ public void updateFSRootLocation(URI oldURI, URI newURI, String serdePropKey, St
printSerdePropURIUpdateSummary(updateSerdeURIretVal, serdePropKey, isDryRun);
}
}
+ private void prepareAcidUpgrade(HiveMetaTool metaTool) {
+ try {
+ prepareAcidUpgradeInternal();
+ }
+ catch(TException|IOException ex) {
+ System.err.println(StringUtils.stringifyException(ex));
+ printAndExit(metaTool);
+ }
+ }
+ /**
+ * todo: make sure compaction queue is configured and has ample capacity
+ * todo: what to do on failure? Suppose some table/part is not readable. should it produce
+ * todo: should probably suppor dryRun mode where we output scripts but instead of renaming files
+ * we generate a renaming script. Alternatively, always generate a renaming script and have
+ * user execute it - this is probably a better option. If script is not empty on rerun someone
+ * added files to table to be made Acid.
+ * commands for all other tables?
+ * todo: how do we test this? even if we had 2.x data it won't be readable in 3.0. even w/o any
+ * updates, txnids in the data won't make sense in 3.0 w/o actually performing equivalent of
+ * metastore upgrade to init writeid table. Also, can we even create a new table and set location
+ * to existing files to simulate a 2.x table?
+ *
+ * @throws MetaException
+ * @throws TException
+ */
+ private void prepareAcidUpgradeInternal() throws MetaException, TException, IOException {
+ Configuration conf = MetastoreConf.newMetastoreConf();
+ initObjectStore(conf);//todo: not actually using this
+ System.out.println("Looking for Acid tables that need to be compacted");
+ //todo: check if acid is enabled and bail if not
+ //todo: check that running on 2.x?
+ HiveMetaStoreClient hms = new HiveMetaStoreClient(conf);//MetaException
+ List databases = hms.getAllDatabases();//TException
+ System.out.println("Found " + databases.size() + " databases to process");
+ List compactions = new ArrayList<>();
+ List convertToAcid = new ArrayList<>();
+ List convertToMM = new ArrayList<>();
+ final String scriptLocation = ".";//todo: get this from input
+ for(String dbName : databases) {
+ List tables = hms.getAllTables(dbName);
+ System.out.println("found " + tables.size() + " tables in " + dbName);
+ for(String tableName : tables) {
+ Table t = hms.getTable(dbName, tableName);
+
+ //ql depends on metastore and is not accessible here... and if it was, I would not be using
+ //2.6 exec jar, but 3.0.... which is not what we want
+ List compactionCommands = getCompactionCommands(t, conf, hms);
+ compactions.addAll(compactionCommands);
+ processConversion(t, convertToAcid, convertToMM);
+ /*todo: handle renaming files somewhere
+ * */
+ }
+ }
+ makeCompactionScript(compactions, scriptLocation);
+ makeConvertTableScript(convertToAcid, convertToMM, scriptLocation);
+ makeRenameFileScript(scriptLocation);
+ }
+ private static void processConversion(Table t, List convertToAcid, List convertToMM) {
+ if(isFullAcidTable(t)) {
+ return;
+ }
+ //todo: check that it's a MANAGED table, not a view, etc
+ if(t.getPartitionKeysSize() <= 0) {
+ t.getSd().getInputFormat(); //todo: this is a string so we need to load the class to check isAssignable,
+ // i.e. implements AcidInputFormat/AcidOutputFormat so... ClassLoader issues?
+ t.getSd().getOutputFormat();
+ }
+ else {
+ //have to check each partition's StorageDescriptor
+ }
+ }
+ /**
+ * currently writes to current dir (whatever that is).
+ * If there is nothing to compact, outputs empty file so as not to confuse the output with a
+ * failed run.
+ * todo: add some config to tell it where to put the script
+ */
+ private static void makeCompactionScript(List commands, String scriptLocation) throws IOException {
+ createScript(commands, "compacts_" + System.currentTimeMillis() + ".sql", scriptLocation);
+ }
+ private static void makeConvertTableScript(List alterTableAcid, List alterTableMm, String scriptLocation) throws IOException {
+ createScript(alterTableAcid, "convertToAcid_" + System.currentTimeMillis() + ".sql", scriptLocation);
+ createScript(alterTableAcid, "convertToMM_" + System.currentTimeMillis() + ".sql", scriptLocation);
+ }
+ private static void createScript(List commands, String fileName, String scriptLocation) throws IOException {
+ //todo: make sure to create the file in 'scriptLocation' dir
+ FileWriter fw = new FileWriter(scriptLocation + "/" + fileName);
+ PrintWriter pw = new PrintWriter(fw);
+ for(String cmd : commands) {
+ pw.println(cmd + ";");
+ }
+ fw.close();
+ }
+ private static void makeRenameFileScript(String scriptLocation) throws IOException {
+ createScript(Collections.emptyList(), "normalizeFileNames_" + System.currentTimeMillis() + ".sh", scriptLocation);
+ }
+ /**
+ * @return any compaction commands to run
+ */
+ private static List getCompactionCommands(Table t, Configuration conf, HiveMetaStoreClient hms) throws IOException, TException {
+ if(!isFullAcidTable(t)) {
+ return Collections.emptyList();
+ }
+ if(t.getPartitionKeysSize() <= 0) {
+ //not partitioned
+ if(!needsCompaction(new Path(t.getSd().getLocation()), conf)) {
+ return Collections.emptyList();
+ }
+
+ List cmds = new ArrayList<>();
+ cmds.add(getCompactionCommand(t, null));
+ return cmds;
+ }
+ List partNames = hms.listPartitionNames(t.getDbName(), t.getTableName(), (short)-1);
+ int batchSize = 1000;//todo: right size?
+ int numWholeBatches = partNames.size()/batchSize;
+ List compactionCommands = new ArrayList<>();
+ for(int i = 0; i < numWholeBatches; i++) {
+ List partitionList = hms.getPartitionsByNames(t.getDbName(), t.getTableName(), partNames.subList(i * batchSize, (i + 1) * batchSize));
+ for(Partition p : partitionList) {
+ if(needsCompaction(new Path(p.getSd().getLocation()), conf)) {
+ compactionCommands.add(getCompactionCommand(t, p));
+ }
+ }
+ }
+ if(numWholeBatches * batchSize < partNames.size()) {
+ //last partial batch
+ List partitionList = hms.getPartitionsByNames(t.getDbName(), t.getTableName(), partNames.subList(numWholeBatches * batchSize, partNames.size()));
+ for (Partition p : partitionList) {
+ if (needsCompaction(new Path(p.getSd().getLocation()), conf)) {
+ compactionCommands.add(getCompactionCommand(t, p));
+ }
+ }
+ }
+ return compactionCommands;
+ }
+ /**
+ *
+ * @param location - path to a partition (or table if not partitioned) dir
+ * @param conf
+ * @return
+ * @throws IOException
+ */
+ private static boolean needsCompaction(Path location, Configuration conf) throws IOException {
+ FileSystem fs = location.getFileSystem(conf);
+ FileStatus[] deltas = fs.listStatus(location, new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ //checking for delete_delta is only so that this functionality can be exercised by code 3.0
+ //which cannot produce any deltas with mix of update/insert events
+ return path.getName().startsWith("delta_") || path.getName().startsWith("delete_delta_");
+ }
+ });
+ if(deltas == null || deltas.length == 0) {
+ //base_n cannot contain update/delete. Original files are all 'insert'
+ return false;
+ }
+ deltaLoop: for(FileStatus delta : deltas) {
+ if(!delta.isDirectory()) {
+ //should never happen - just in case
+ continue;
+ }
+ FileStatus[] buckets = fs.listStatus(delta.getPath(), new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ //since this is inside a delta dir created by Hive 2.x or earlier it can only contain
+ //bucket_x or bucket_x__flush_length
+ return path.getName().startsWith("bucket_");
+ }
+ });
+ for(FileStatus bucket : buckets) {
+ if(bucket.getPath().getName().endsWith("_flush_lengh")) {
+ //streaming ingest dir - cannot have update/delete events
+ continue deltaLoop;
+ }
+ if(needsCompaction(bucket, fs)) {
+ //found delete events - this 'location' needs compacting
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+ private static boolean needsCompaction(FileStatus bucket, FileSystem fs) throws IOException {
+ //create reader, look at footer
+ //no need to check side file since it can only be in a streaming ingest delta
+ Reader orcReader = OrcFile.createReader(bucket.getPath(),OrcFile.readerOptions(fs.getConf())
+ .filesystem(fs));
+ AcidStats as = OrcAcidUtils.parseAcidStats(orcReader);
+ if(as == null) {
+ //should never happen since we are reading bucket_x written by acid write
+ throw new IllegalStateException("AcidStats missing in " + bucket.getPath());
+ }
+ return as.deletes > 0 || as.updates > 0;
+ }
+ private static String getCompactionCommand(Table t, Partition p) {
+ StringBuilder sb = new StringBuilder("ALTER TABLE ").append(Warehouse.getQualifiedName(t));
+ if(t.getPartitionKeysSize() > 0) {
+ assert p != null : "must supply partition for partitioned table " +
+ Warehouse.getQualifiedName(t);
+ sb.append(" PARTITION(");
+ for (int i = 0; i < t.getPartitionKeysSize(); i++) {
+ //todo: should these be quoted? HiveUtils.unparseIdentifier() - if value is String should definitely quote
+ sb.append(t.getPartitionKeys().get(i).getName()).append('=')
+ .append(p.getValues().get(i)).append(",");
+ }
+ sb.setCharAt(sb.length() - 1, ')');//replace trailing ','
+ }
+ return sb.append(" COMPACT 'major'").toString();
+ }
+ private static boolean isFullAcidTable(Table t) {
+ if (t.getParametersSize() <= 0) {
+ //cannot be acid
+ return false;
+ }
+ String transacationalValue = t.getParameters()
+ .get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
+ if (transacationalValue != null && "true".equalsIgnoreCase(transacationalValue)) {
+ System.out.println("Found Acid table: " + Warehouse.getQualifiedName(t));
+ return true;
+ }
+ return false;
+ }
private static void printAndExit(HiveMetaTool metaTool) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("metatool", metaTool.cmdLineOptions);
@@ -460,7 +710,16 @@ public static void main(String[] args) {
} else {
metaTool.updateFSRootLocation(oldURI, newURI, serdepropKey, tablePropKey, isDryRun);
}
- } else {
+ } else if(line.hasOption("prepareAcidUpgrade")) {
+ String[] values = line.getOptionValues("prepareAcidUpgrade");
+ String targetDir = null;
+ if(values != null && values.length > 0) {
+ if(values.length > 1) {
+ System.err.println("HiveMetaTool: prepareAcidUpgrade");
+ }
+ }
+ metaTool.prepareAcidUpgrade(metaTool);
+ } else {
if (line.hasOption("dryRun")) {
System.err.println("HiveMetaTool: dryRun is not a valid standalone option");
} else if (line.hasOption("serdePropKey")) {