diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaTool.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaTool.java
index 5bd83ac128..4d19e8421f 100644
--- itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaTool.java
+++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaTool.java
@@ -252,6 +252,21 @@ private void checkAvroSchemaURLProps(Table table, String expectedURL) {
assertEquals(expectedURL, table.getSd().getParameters().get(AvroSerdeUtils.SCHEMA_URL));
}
+ public void testFindCompactions() throws Exception {
+ redirectOutputStream();
+ String[] args = new String[1];
+ args[0] = new String("-findCompactions");
+
+ try {
+ HiveMetaTool.main(args);
+ String out = os.toString();
+ assert out != null;
+ } finally {
+ restoreOutputStream();
+ System.out.println("Completed testListFSRoot");
+ }
+
+ }
@Override
protected void tearDown() throws Exception {
try {
diff --git standalone-metastore/pom.xml standalone-metastore/pom.xml
index c340fe2d24..10b1bfadf3 100644
--- standalone-metastore/pom.xml
+++ standalone-metastore/pom.xml
@@ -80,6 +80,7 @@
0.9.3
2.8.2
1.10.19
+ 1.4.3
2.5.0
1.3.0
2.6.0-SNAPSHOT
@@ -92,6 +93,21 @@
+
+ org.apache.orc
+ orc-core
+ ${orc.version}
+
+
+ org.apache.hadoop
+ hadoop-common
+
+
+ org.apache.hive
+ hive-storage-api
+
+
+
com.fasterxml.jackson.core
jackson-databind
diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/HiveMetaTool.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/HiveMetaTool.java
index f4eacd5fb9..2b894d38b2 100644
--- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/HiveMetaTool.java
+++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/HiveMetaTool.java
@@ -18,8 +18,13 @@
package org.apache.hadoop.hive.metastore.tools;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
import java.net.URI;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -34,7 +39,22 @@
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.utils.StringUtils;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.impl.AcidStats;
+import org.apache.orc.impl.OrcAcidUtils;
+import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.Path;
@@ -104,6 +124,10 @@ private void init() {
.withDescription("Specify the key for table property to be updated. tablePropKey option " +
"is valid only with updateLocation option.")
.create("tablePropKey");
+ Option findCompactions = OptionBuilder.withArgName("find-compactions")
+ .withDescription("Generates a set Compaction commands to run to prepare for Hive 2.x to" +
+ " 3.0 upgrade")
+ .create("findCompactions");
cmdLineOptions.addOption(help);
cmdLineOptions.addOption(listFSRoot);
@@ -112,6 +136,7 @@ private void init() {
cmdLineOptions.addOption(dryRun);
cmdLineOptions.addOption(serdePropKey);
cmdLineOptions.addOption(tablePropKey);
+ cmdLineOptions.addOption(findCompactions);
}
private void initObjectStore(Configuration conf) {
@@ -362,7 +387,193 @@ public void updateFSRootLocation(URI oldURI, URI newURI, String serdePropKey, St
printSerdePropURIUpdateSummary(updateSerdeURIretVal, serdePropKey, isDryRun);
}
}
+ private void findCompactions(HiveMetaTool metaTool) {
+ try {
+ findCompactionsInternal();
+ }
+ catch(TException|IOException ex) {
+ System.err.println(StringUtils.stringifyException(ex));
+ printAndExit(metaTool);
+ }
+ }
+ /**
+ * todo: make sure compaction queue is configured and has ample capacity
+ * todo: what to do on failure? Suppose some table/part is not readable. should it produce
+ * comands for all other tables?
+ * @throws MetaException
+ * @throws TException
+ */
+ private void findCompactionsInternal() throws MetaException, TException, IOException {
+ Configuration conf = MetastoreConf.newMetastoreConf();
+ initObjectStore(conf);//todo: not actually using this
+ System.out.println("Looking for Acid tables that need to be compacted");
+ //todo: check if acid is enabled and bail if not
+ //todo: check that running on 2.x?
+ HiveMetaStoreClient hms = new HiveMetaStoreClient(conf);//MetaException
+ List databases = hms.getAllDatabases();//TException
+ System.out.println("Found " + databases.size() + " databases to process");
+ List compactions = new ArrayList<>();
+ for(String dbName : databases) {
+ List tables = hms.getAllTables(dbName);
+ System.out.println("found " + tables.size() + " tables in " + dbName);
+ for(String tableName : tables) {
+ Table t = hms.getTable(dbName, tableName);
+
+ //ql depends on metastore and is not accessible here... and if it was, I would not be using
+ //2.6 exec jar, but 3.0.... which is not what we want
+ List compactionCommands = getCompactionCommands(t, conf, hms);
+ compactions.addAll(compactionCommands);
+ if(compactionCommands.isEmpty()) {
+ /*check if 't' is not acid but can be made acid or MM
+ * and output a script
+ *
+ * todo: handle renaming files somewhere
+ * */
+ }
+ }
+ }
+ makeCompactionScript(compactions);
+ }
+
+ /**
+ * currently writes to current dir (whatever that is).
+ * If there is nothing to compact, outputs empty file so as not to confuse the output with a
+ * failed run
+ */
+ private static void makeCompactionScript(List commands) throws IOException {
+ FileWriter fw = new FileWriter("./compacts_" + System.currentTimeMillis() + ".sql");
+ PrintWriter pw = new PrintWriter(fw);
+ for(String cmd : commands) {
+ pw.println(cmd);
+ }
+ fw.close();
+ }
+ /**
+ * @return any compaction commands to run
+ */
+ private static List getCompactionCommands(Table t, Configuration conf, HiveMetaStoreClient hms) throws IOException, TException {
+ if(!isFullAcidTable(t)) {
+ return Collections.emptyList();
+ }
+ if(t.getPartitionKeysSize() <= 0) {
+ //not partitioned
+ if(!needsCompaction(new Path(t.getSd().getLocation()), conf)) {
+ return Collections.emptyList();
+ }
+
+ List cmds = new ArrayList<>();
+ cmds.add(getCompactionCommand(t, null));
+ return cmds;
+ }
+ List partNames = hms.listPartitionNames(t.getDbName(), t.getTableName(), (short)-1);
+ int batchSize = 1000;//todo: right size?
+ int numWholeBatches = partNames.size()/batchSize;
+ List compactionCommands = new ArrayList<>();
+ for(int i = 0; i < numWholeBatches; i++) {
+ List partitionList = hms.getPartitionsByNames(t.getDbName(), t.getTableName(), partNames.subList(i * batchSize, (i + 1) * batchSize));
+ for(Partition p : partitionList) {
+ if(needsCompaction(new Path(p.getSd().getLocation()), conf)) {
+ compactionCommands.add(getCompactionCommand(t, p));
+ }
+ }
+ }
+ if(numWholeBatches * batchSize < partNames.size()) {
+ //last partial batch
+ List partitionList = hms.getPartitionsByNames(t.getDbName(), t.getTableName(), partNames.subList(numWholeBatches * batchSize, partNames.size()));
+ for (Partition p : partitionList) {
+ if (needsCompaction(new Path(p.getSd().getLocation()), conf)) {
+ compactionCommands.add(getCompactionCommand(t, p));
+ }
+ }
+ }
+ return compactionCommands;
+ }
+ /**
+ *
+ * @param location - path to a partition (or table if not partitioned) dir
+ * @param conf
+ * @return
+ * @throws IOException
+ */
+ private static boolean needsCompaction(Path location, Configuration conf) throws IOException {
+ FileSystem fs = location.getFileSystem(conf);
+ FileStatus[] deltas = fs.listStatus(location, new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ return path.getName().startsWith("delta_");
+ }
+ });
+ if(deltas == null || deltas.length == 0) {
+ //base_n cannot contain update/delete. Original files are all 'insert'
+ return false;
+ }
+ deltaLoop: for(FileStatus delta : deltas) {
+ if(!delta.isDirectory()) {
+ //should never happen - just in case
+ continue;
+ }
+ FileStatus[] buckets = fs.listStatus(delta.getPath(), new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ //since this is inside a delta dir created by Hive 2.x or earlier it can only contain
+ //bucket_x or bucket_x__flush_length
+ return path.getName().startsWith("bucket_");
+ }
+ });
+ for(FileStatus bucket : buckets) {
+ if(bucket.getPath().getName().endsWith("_flush_lengh")) {
+ //streaming ingest dir - cannot have update/delete events
+ continue deltaLoop;
+ }
+ if(needsCompaction(bucket, fs)) {
+ //found delete events - this 'location' needs compacting
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+ private static boolean needsCompaction(FileStatus bucket, FileSystem fs) throws IOException {
+ //create reader, look at footer
+ //no need to check side file since it can only be in a streaming ingest delta
+ Reader orcReader = OrcFile.createReader(bucket.getPath(),OrcFile.readerOptions(fs.getConf())
+ .filesystem(fs));
+ AcidStats as = OrcAcidUtils.parseAcidStats(orcReader);
+ if(as == null) {
+ //should never happen since we are reading bucket_x written by acid write
+ throw new IllegalStateException("AcidStats missing in " + bucket.getPath());
+ }
+ return as.deletes > 0 || as.updates > 0;
+ }
+ private static String getCompactionCommand(Table t, Partition p) {
+ StringBuilder sb = new StringBuilder("ALTER TABLE ").append(Warehouse.getQualifiedName(t));
+ if(t.getPartitionKeysSize() > 0) {
+ assert p != null : "must supply partition for partitioned table " +
+ Warehouse.getQualifiedName(t);
+ sb.append(" PARTITION(");
+ for (int i = 0; i < t.getPartitionKeysSize(); i++) {
+ //todo: should these be quoted? HiveUtils.unparseIdentifier()
+ sb.append(t.getPartitionKeys().get(i).getName()).append('=')
+ .append(p.getValues().get(i)).append(",");
+ }
+ sb.setCharAt(sb.length() - 1, ')');//replace trailing ','
+ }
+ return sb.append(" COMPACT 'major'").toString();
+ }
+ private static boolean isFullAcidTable(Table t) {
+ if (t.getParametersSize() <= 0) {
+ //cannot be acid
+ return false;
+ }
+ String transacationalValue = t.getParameters()
+ .get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
+ if (transacationalValue != null && "true".equalsIgnoreCase(transacationalValue)) {
+ System.out.println("Found Acid table: " + Warehouse.getQualifiedName(t));
+ return true;
+ }
+ return false;
+ }
private static void printAndExit(HiveMetaTool metaTool) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("metatool", metaTool.cmdLineOptions);
@@ -460,7 +671,9 @@ public static void main(String[] args) {
} else {
metaTool.updateFSRootLocation(oldURI, newURI, serdepropKey, tablePropKey, isDryRun);
}
- } else {
+ } else if(line.hasOption("findCompactions")) {
+ metaTool.findCompactions(metaTool);
+ } else {
if (line.hasOption("dryRun")) {
System.err.println("HiveMetaTool: dryRun is not a valid standalone option");
} else if (line.hasOption("serdePropKey")) {
diff --git standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/tools/TestHiveMetaTool.java standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/tools/TestHiveMetaTool.java
index 7ac6e9bc69..ee0320c01d 100644
--- standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/tools/TestHiveMetaTool.java
+++ standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/tools/TestHiveMetaTool.java
@@ -1,4 +1,5 @@
package org.apache.hadoop.hive.metastore.tools;
+
public class TestHiveMetaTool {
}