diff --git pom.xml pom.xml index 4fb83c9df0..4e6feefc4a 100644 --- pom.xml +++ pom.xml @@ -106,7 +106,7 @@ 2.4 2.4 3.1.0 - 2.20.1 + 2.21.0 2.4 2.8 2.9 diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java index 0e53697be2..0576c02a8b 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java @@ -19,6 +19,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.tools.HiveMetaTool; import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; @@ -535,4 +536,39 @@ public void testMMExportAborted() throws Exception { TestTxnCommands2.stringifyValues(data), rs); } + @Test + public void testMetaTool() throws Exception { + int[][] data = {{1,2}, {3, 4}, {5, 6}}; + int[][] dataPart = {{1, 2, 10}, {3, 4, 11}, {5, 6, 12}}; + runStatementOnDriver("drop table if exists TAcid"); + runStatementOnDriver("drop table if exists TAcidPart"); + runStatementOnDriver("drop table if exists TFlat"); + runStatementOnDriver("drop table if exists TFlatText"); + runStatementOnDriver("create table TAcid (a int, b int) stored as orc"); + runStatementOnDriver("create table TAcidPart (a int, b int) partitioned by (p int) stored" + + " as orc"); + runStatementOnDriver("create table TFlat (a int, b int) stored as orc tblproperties('transactional'='false')"); + runStatementOnDriver("create table TFlatText (a int, b int) stored as textfile tblproperties('transactional'='false')"); + + + //this needs major compaction + runStatementOnDriver("insert into TAcid" + TestTxnCommands2.makeValuesClause(data)); + runStatementOnDriver("update TAcid set a = 1 where b = 2"); + + //this table needs to be converted to Acid + runStatementOnDriver("insert into TFlat" + TestTxnCommands2.makeValuesClause(data)); + + //this table needs to be converted to MM + runStatementOnDriver("insert into TFlatText" + TestTxnCommands2.makeValuesClause(data)); + + //p=10 needs major compaction + runStatementOnDriver("insert into TAcidPart" + TestTxnCommands2.makeValuesClause(dataPart)); + runStatementOnDriver("update TAcidPart set a = 1 where b = 2 and p = 10"); + + + //todo: rename files case + String[] args = new String[1]; + args[0] = new String("-prepareAcidUpgrade"); + HiveMetaTool.main(args); + } } diff --git standalone-metastore/pom.xml standalone-metastore/pom.xml index c340fe2d24..10b1bfadf3 100644 --- standalone-metastore/pom.xml +++ standalone-metastore/pom.xml @@ -80,6 +80,7 @@ 0.9.3 2.8.2 1.10.19 + 1.4.3 2.5.0 1.3.0 2.6.0-SNAPSHOT @@ -92,6 +93,21 @@ + + org.apache.orc + orc-core + ${orc.version} + + + org.apache.hadoop + hadoop-common + + + org.apache.hive + hive-storage-api + + + com.fasterxml.jackson.core jackson-databind diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/HiveMetaTool.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/HiveMetaTool.java index f4eacd5fb9..6d1c6736f2 100644 --- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/HiveMetaTool.java +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/HiveMetaTool.java @@ -18,8 +18,13 @@ package org.apache.hadoop.hive.metastore.tools; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; import java.net.URI; +import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -34,7 +39,22 @@ import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.utils.StringUtils; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.impl.AcidStats; +import org.apache.orc.impl.OrcAcidUtils; +import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.Path; @@ -104,6 +124,12 @@ private void init() { .withDescription("Specify the key for table property to be updated. tablePropKey option " + "is valid only with updateLocation option.") .create("tablePropKey"); + Option prepareAcidUpgrade = + OptionBuilder.withArgName("find-compactions") + .hasOptionalArg() //directory to output results to + .withDescription("Generates a set Compaction commands to run to prepare for Hive 2.x" + + " to 3.0 upgrade") + .create("prepareAcidUpgrade"); cmdLineOptions.addOption(help); cmdLineOptions.addOption(listFSRoot); @@ -112,6 +138,7 @@ private void init() { cmdLineOptions.addOption(dryRun); cmdLineOptions.addOption(serdePropKey); cmdLineOptions.addOption(tablePropKey); + cmdLineOptions.addOption(prepareAcidUpgrade); } private void initObjectStore(Configuration conf) { @@ -362,7 +389,230 @@ public void updateFSRootLocation(URI oldURI, URI newURI, String serdePropKey, St printSerdePropURIUpdateSummary(updateSerdeURIretVal, serdePropKey, isDryRun); } } + private void prepareAcidUpgrade(HiveMetaTool metaTool) { + try { + prepareAcidUpgradeInternal(); + } + catch(TException|IOException ex) { + System.err.println(StringUtils.stringifyException(ex)); + printAndExit(metaTool); + } + } + /** + * todo: make sure compaction queue is configured and has ample capacity + * todo: what to do on failure? Suppose some table/part is not readable. should it produce + * todo: should probably suppor dryRun mode where we output scripts but instead of renaming files + * we generate a renaming script. Alternatively, always generate a renaming script and have + * user execute it - this is probably a better option. If script is not empty on rerun someone + * added files to table to be made Acid. + * commands for all other tables? + * todo: how do we test this? even if we had 2.x data it won't be readable in 3.0. even w/o any + * updates, txnids in the data won't make sense in 3.0 w/o actually performing equivalent of + * metastore upgrade to init writeid table. Also, can we even create a new table and set location + * to existing files to simulate a 2.x table? + * + * @throws MetaException + * @throws TException + */ + private void prepareAcidUpgradeInternal() throws MetaException, TException, IOException { + Configuration conf = MetastoreConf.newMetastoreConf(); + initObjectStore(conf);//todo: not actually using this + System.out.println("Looking for Acid tables that need to be compacted"); + //todo: check if acid is enabled and bail if not + //todo: check that running on 2.x? + HiveMetaStoreClient hms = new HiveMetaStoreClient(conf);//MetaException + List databases = hms.getAllDatabases();//TException + System.out.println("Found " + databases.size() + " databases to process"); + List compactions = new ArrayList<>(); + List convertToAcid = new ArrayList<>(); + List convertToMM = new ArrayList<>(); + final String scriptLocation = ".";//todo: get this from input + for(String dbName : databases) { + List tables = hms.getAllTables(dbName); + System.out.println("found " + tables.size() + " tables in " + dbName); + for(String tableName : tables) { + Table t = hms.getTable(dbName, tableName); + + //ql depends on metastore and is not accessible here... and if it was, I would not be using + //2.6 exec jar, but 3.0.... which is not what we want + List compactionCommands = getCompactionCommands(t, conf, hms); + compactions.addAll(compactionCommands); + processConversion(t, convertToAcid, convertToMM); + /*todo: handle renaming files somewhere + * */ + } + } + makeCompactionScript(compactions, scriptLocation); + makeConvertTableScript(convertToAcid, convertToMM, scriptLocation); + makeRenameFileScript(scriptLocation); + } + private static void processConversion(Table t, List convertToAcid, List convertToMM) { + if(isFullAcidTable(t)) { + return; + } + //todo: check that it's a MANAGED table, not a view, etc + if(t.getPartitionKeysSize() <= 0) { + t.getSd().getInputFormat(); //todo: this is a string so we need to load the class to check isAssignable, + // i.e. implements AcidInputFormat/AcidOutputFormat so... ClassLoader issues? + t.getSd().getOutputFormat(); + } + else { + //have to check each partition's StorageDescriptor + } + } + /** + * currently writes to current dir (whatever that is). + * If there is nothing to compact, outputs empty file so as not to confuse the output with a + * failed run. + * todo: add some config to tell it where to put the script + */ + private static void makeCompactionScript(List commands, String scriptLocation) throws IOException { + createScript(commands, "compacts_" + System.currentTimeMillis() + ".sql", scriptLocation); + } + private static void makeConvertTableScript(List alterTableAcid, List alterTableMm, String scriptLocation) throws IOException { + createScript(alterTableAcid, "convertToAcid_" + System.currentTimeMillis() + ".sql", scriptLocation); + createScript(alterTableAcid, "convertToMM_" + System.currentTimeMillis() + ".sql", scriptLocation); + } + private static void createScript(List commands, String fileName, String scriptLocation) throws IOException { + //todo: make sure to create the file in 'scriptLocation' dir + FileWriter fw = new FileWriter(scriptLocation + "/" + fileName); + PrintWriter pw = new PrintWriter(fw); + for(String cmd : commands) { + pw.println(cmd + ";"); + } + fw.close(); + } + private static void makeRenameFileScript(String scriptLocation) throws IOException { + createScript(Collections.emptyList(), "normalizeFileNames_" + System.currentTimeMillis() + ".sh", scriptLocation); + } + /** + * @return any compaction commands to run + */ + private static List getCompactionCommands(Table t, Configuration conf, HiveMetaStoreClient hms) throws IOException, TException { + if(!isFullAcidTable(t)) { + return Collections.emptyList(); + } + if(t.getPartitionKeysSize() <= 0) { + //not partitioned + if(!needsCompaction(new Path(t.getSd().getLocation()), conf)) { + return Collections.emptyList(); + } + + List cmds = new ArrayList<>(); + cmds.add(getCompactionCommand(t, null)); + return cmds; + } + List partNames = hms.listPartitionNames(t.getDbName(), t.getTableName(), (short)-1); + int batchSize = 1000;//todo: right size? + int numWholeBatches = partNames.size()/batchSize; + List compactionCommands = new ArrayList<>(); + for(int i = 0; i < numWholeBatches; i++) { + List partitionList = hms.getPartitionsByNames(t.getDbName(), t.getTableName(), partNames.subList(i * batchSize, (i + 1) * batchSize)); + for(Partition p : partitionList) { + if(needsCompaction(new Path(p.getSd().getLocation()), conf)) { + compactionCommands.add(getCompactionCommand(t, p)); + } + } + } + if(numWholeBatches * batchSize < partNames.size()) { + //last partial batch + List partitionList = hms.getPartitionsByNames(t.getDbName(), t.getTableName(), partNames.subList(numWholeBatches * batchSize, partNames.size())); + for (Partition p : partitionList) { + if (needsCompaction(new Path(p.getSd().getLocation()), conf)) { + compactionCommands.add(getCompactionCommand(t, p)); + } + } + } + return compactionCommands; + } + /** + * + * @param location - path to a partition (or table if not partitioned) dir + * @param conf + * @return + * @throws IOException + */ + private static boolean needsCompaction(Path location, Configuration conf) throws IOException { + FileSystem fs = location.getFileSystem(conf); + FileStatus[] deltas = fs.listStatus(location, new PathFilter() { + @Override + public boolean accept(Path path) { + //checking for delete_delta is only so that this functionality can be exercised by code 3.0 + //which cannot produce any deltas with mix of update/insert events + return path.getName().startsWith("delta_") || path.getName().startsWith("delete_delta_"); + } + }); + if(deltas == null || deltas.length == 0) { + //base_n cannot contain update/delete. Original files are all 'insert' + return false; + } + deltaLoop: for(FileStatus delta : deltas) { + if(!delta.isDirectory()) { + //should never happen - just in case + continue; + } + FileStatus[] buckets = fs.listStatus(delta.getPath(), new PathFilter() { + @Override + public boolean accept(Path path) { + //since this is inside a delta dir created by Hive 2.x or earlier it can only contain + //bucket_x or bucket_x__flush_length + return path.getName().startsWith("bucket_"); + } + }); + for(FileStatus bucket : buckets) { + if(bucket.getPath().getName().endsWith("_flush_lengh")) { + //streaming ingest dir - cannot have update/delete events + continue deltaLoop; + } + if(needsCompaction(bucket, fs)) { + //found delete events - this 'location' needs compacting + return true; + } + } + } + return false; + } + private static boolean needsCompaction(FileStatus bucket, FileSystem fs) throws IOException { + //create reader, look at footer + //no need to check side file since it can only be in a streaming ingest delta + Reader orcReader = OrcFile.createReader(bucket.getPath(),OrcFile.readerOptions(fs.getConf()) + .filesystem(fs)); + AcidStats as = OrcAcidUtils.parseAcidStats(orcReader); + if(as == null) { + //should never happen since we are reading bucket_x written by acid write + throw new IllegalStateException("AcidStats missing in " + bucket.getPath()); + } + return as.deletes > 0 || as.updates > 0; + } + private static String getCompactionCommand(Table t, Partition p) { + StringBuilder sb = new StringBuilder("ALTER TABLE ").append(Warehouse.getQualifiedName(t)); + if(t.getPartitionKeysSize() > 0) { + assert p != null : "must supply partition for partitioned table " + + Warehouse.getQualifiedName(t); + sb.append(" PARTITION("); + for (int i = 0; i < t.getPartitionKeysSize(); i++) { + //todo: should these be quoted? HiveUtils.unparseIdentifier() - if value is String should definitely quote + sb.append(t.getPartitionKeys().get(i).getName()).append('=') + .append(p.getValues().get(i)).append(","); + } + sb.setCharAt(sb.length() - 1, ')');//replace trailing ',' + } + return sb.append(" COMPACT 'major'").toString(); + } + private static boolean isFullAcidTable(Table t) { + if (t.getParametersSize() <= 0) { + //cannot be acid + return false; + } + String transacationalValue = t.getParameters() + .get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL); + if (transacationalValue != null && "true".equalsIgnoreCase(transacationalValue)) { + System.out.println("Found Acid table: " + Warehouse.getQualifiedName(t)); + return true; + } + return false; + } private static void printAndExit(HiveMetaTool metaTool) { HelpFormatter formatter = new HelpFormatter(); formatter.printHelp("metatool", metaTool.cmdLineOptions); @@ -460,7 +710,16 @@ public static void main(String[] args) { } else { metaTool.updateFSRootLocation(oldURI, newURI, serdepropKey, tablePropKey, isDryRun); } - } else { + } else if(line.hasOption("prepareAcidUpgrade")) { + String[] values = line.getOptionValues("prepareAcidUpgrade"); + String targetDir = null; + if(values != null && values.length > 0) { + if(values.length > 1) { + System.err.println("HiveMetaTool: prepareAcidUpgrade"); + } + } + metaTool.prepareAcidUpgrade(metaTool); + } else { if (line.hasOption("dryRun")) { System.err.println("HiveMetaTool: dryRun is not a valid standalone option"); } else if (line.hasOption("serdePropKey")) {