diff --git a/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/PreUpgradeTool.java b/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/PreUpgradeTool.java index 2547f25..04782a6 100644 --- a/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/PreUpgradeTool.java +++ b/upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/PreUpgradeTool.java @@ -50,14 +50,12 @@ import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.io.orc.OrcFile; +import org.apache.hadoop.hive.ql.io.orc.Reader; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hive.common.util.HiveVersionInfo; -import org.apache.orc.OrcFile; -import org.apache.orc.Reader; -import org.apache.orc.impl.AcidStats; -import org.apache.orc.impl.OrcAcidUtils; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,6 +63,10 @@ import java.io.FileWriter; import java.io.IOException; import java.io.PrintWriter; +import java.nio.ByteBuffer; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -423,54 +425,7 @@ private static void scheduleCompaction(Table t, Partition p, Hive db, } compactionMetaInfo.compactionIds.add(resp.getId()); } - /** - * - * @param location - path to a partition (or table if not partitioned) dir - */ - private static boolean needsCompaction2(Path location, HiveConf conf, - CompactionMetaInfo compactionMetaInfo) throws IOException { - FileSystem fs = location.getFileSystem(conf); - FileStatus[] deltas = fs.listStatus(location, new PathFilter() { - @Override - public boolean accept(Path path) { - //checking for delete_delta is only so that this functionality can be exercised by code 3.0 - //which cannot produce any deltas with mix of update/insert events - return path.getName().startsWith("delta_") || path.getName().startsWith("delete_delta_"); - } - }); - if(deltas == null || deltas.length == 0) { - //base_n cannot contain update/delete. Original files are all 'insert' and we need to compact - //only if there are update/delete events. - return false; - } - deltaLoop: for(FileStatus delta : deltas) { - if(!delta.isDirectory()) { - //should never happen - just in case - continue; - } - FileStatus[] buckets = fs.listStatus(delta.getPath(), new PathFilter() { - @Override - public boolean accept(Path path) { - //since this is inside a delta dir created by Hive 2.x or earlier it can only contain - //bucket_x or bucket_x__flush_length - return path.getName().startsWith("bucket_"); - } - }); - for(FileStatus bucket : buckets) { - if(bucket.getPath().getName().endsWith("_flush_length")) { - //streaming ingest dir - cannot have update/delete events - continue deltaLoop; - } - if(needsCompaction(bucket, fs)) { - //found delete events - this 'location' needs compacting - compactionMetaInfo.numberOfBytes += getDataSize(location, conf); - //todo: this is not remotely accurate if you have many (relevant) original files - return true; - } - } - } - return false; - } + /** * * @param location - path to a partition (or table if not partitioned) dir @@ -536,18 +491,32 @@ private static long getDataSize(Path location, HiveConf conf) throws IOException ContentSummary cs = fs.getContentSummary(location); return cs.getLength(); } + + + private static final Charset utf8 = Charset.forName("UTF-8"); + private static final CharsetDecoder utf8Decoder = utf8.newDecoder(); + private static final String ACID_STATS = "hive.acid.stats"; + private static boolean needsCompaction(FileStatus bucket, FileSystem fs) throws IOException { //create reader, look at footer //no need to check side file since it can only be in a streaming ingest delta - Reader orcReader = OrcFile.createReader(bucket.getPath(),OrcFile.readerOptions(fs.getConf()) - .filesystem(fs)); - AcidStats as = OrcAcidUtils.parseAcidStats(orcReader); - if(as == null) { - //should never happen since we are reading bucket_x written by acid write + Reader orcReader = OrcFile.createReader(bucket.getPath(), OrcFile.readerOptions(fs.getConf()).filesystem(fs)); + if (orcReader.hasMetadataValue(ACID_STATS)) { + try { + ByteBuffer val = orcReader.getMetadataValue(ACID_STATS).duplicate(); + String acidStats = utf8Decoder.decode(val).toString(); + String[] parts = acidStats.split(","); + long updates = Long.parseLong(parts[1]); + long deletes = Long.parseLong(parts[2]); + return deletes > 0 || updates > 0; + } catch (CharacterCodingException e) { + throw new IllegalArgumentException("Bad string encoding for " + ACID_STATS, e); + } + } else { throw new IllegalStateException("AcidStats missing in " + bucket.getPath()); } - return as.deletes > 0 || as.updates > 0; } + private static String getCompactionCommand(Table t, Partition p) { StringBuilder sb = new StringBuilder("ALTER TABLE ").append(Warehouse.getQualifiedName(t)); if(t.getPartitionKeysSize() > 0) { diff --git a/upgrade-acid/pre-upgrade/src/test/java/org/apache/hadoop/hive/upgrade/acid/TestPreUpgradeTool.java b/upgrade-acid/pre-upgrade/src/test/java/org/apache/hadoop/hive/upgrade/acid/TestPreUpgradeTool.java index 4fe7007..fe4b08b 100644 --- a/upgrade-acid/pre-upgrade/src/test/java/org/apache/hadoop/hive/upgrade/acid/TestPreUpgradeTool.java +++ b/upgrade-acid/pre-upgrade/src/test/java/org/apache/hadoop/hive/upgrade/acid/TestPreUpgradeTool.java @@ -44,8 +44,6 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.File; import java.util.ArrayList; @@ -53,7 +51,6 @@ import java.util.concurrent.atomic.AtomicBoolean; public class TestPreUpgradeTool { - private static final Logger LOG = LoggerFactory.getLogger(TestPreUpgradeTool.class); private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") + File.separator + TestPreUpgradeTool.class.getCanonicalName() + "-" + System.currentTimeMillis() ).getPath().replaceAll("\\\\", "/");