diff --git a/ql/pom.xml b/ql/pom.xml index 7087a4c..ccddb96 100644 --- a/ql/pom.xml +++ b/ql/pom.xml @@ -342,6 +342,13 @@ org.apache.hadoop + hadoop-mapreduce-client-common + ${hadoop-23.version} + true + test + + + org.apache.hadoop hadoop-hdfs ${hadoop-23.version} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java new file mode 100644 index 0000000..d9dd4e4 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.hive.metastore.txn.CompactionInfo; +import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler; +import org.apache.hadoop.hive.metastore.txn.TxnHandler; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.StringUtils; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; + +/** + * A class to clean directories after compactions. This will run in a separate thread. + */ +public class Cleaner extends CompactorThread { + static final private String CLASS_NAME = Cleaner.class.getName(); + static final private Log LOG = LogFactory.getLog(CLASS_NAME); + static final private int threadId = 10001; + + private long cleanerCheckInterval = 5000; + + @Override + public void run() { + // Make sure nothing escapes this run method and kills the metastore at large, + // so wrap it in a big catch Throwable statement. + while (!stop.timeToStop) { + try { + long startedAt = System.currentTimeMillis(); + + // Now look for new entries ready to be cleaned. + List toClean = txnHandler.findReadyToClean(); + for (CompactionInfo ci : toClean) { + LockComponent comp = null; + if (ci.partName == null) { + comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.TABLE, ci.dbname, + ci.tableName, null); + } else { + comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.PARTITION, ci.dbname, + ci.tableName, ci.partName); + } + List components = new ArrayList(1); + components.add(comp); + LockRequest rqst = new LockRequest(components, System.getProperty("user.name"), + Worker.hostname()); + LockResponse rsp = txnHandler.lockNoWait(rqst); + try { + if (rsp.getState() == LockState.ACQUIRED) { + clean(ci); + } + } finally { + txnHandler.unlock(new UnlockRequest(rsp.getLockid())); + } + } + + // Now, go back to bed until it's time to do this again + long elapsedTime = System.currentTimeMillis() - startedAt; + if (elapsedTime >= cleanerCheckInterval) continue; + else Thread.sleep(cleanerCheckInterval - elapsedTime); + } catch (Throwable t) { + LOG.error("Caught an exception in the main loop of compactor cleaner, " + + StringUtils.stringifyException(t)); + } + } + } + + private void clean(CompactionInfo ci) { + String s = "Starting cleaning for " + ci.getFullPartitionName(); + LOG.info(s); + try { + StorageDescriptor sd = resolveStorageDescriptor(resolveTable(ci), resolvePartition(ci)); + String location = sd.getLocation(); + + // Create a bogus validTxnList with a high water mark set to MAX_LONG and no open + // transactions. This assures that all deltas are treated as valid and all we return are + // obsolete files. + GetOpenTxnsResponse rsp = new GetOpenTxnsResponse(Long.MAX_VALUE, new HashSet()); + IMetaStoreClient.ValidTxnList txnList = new HiveMetaStoreClient.ValidTxnListImpl(rsp); + + AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(location), conf, txnList); + List obsoleteDirs = dir.getObsolete(); + final List filesToDelete = new ArrayList(obsoleteDirs.size()); + for (FileStatus stat : obsoleteDirs) { + filesToDelete.add(stat.getPath()); + } + if (filesToDelete.size() < 1) { + LOG.warn("Hmm, nothing to delete in the cleaner for directory " + location + + ", that hardly seems right."); + return; + } + final FileSystem fs = filesToDelete.get(0).getFileSystem(conf); + + if (runJobAsSelf(ci.runAs)) { + removeFiles(fs, filesToDelete); + } else { + UserGroupInformation ugi = UserGroupInformation.createProxyUser(ci.runAs, + UserGroupInformation.getLoginUser()); + ugi.doAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws Exception { + removeFiles(fs, filesToDelete); + return null; + } + }); + } + + } catch (Exception e) { + LOG.error("Caught exception when cleaning, unable to complete cleaning " + + StringUtils.stringifyException(e)); + } finally { + // We need to clean this out one way or another. + txnHandler.markCleaned(ci); + } + + } + + private void removeFiles(FileSystem fs, List deadFilesWalking) throws IOException { + for (Path dead : deadFilesWalking) { + LOG.debug("Doing to delete path " + dead.toString()); + fs.delete(dead, true); + } + + } + +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java new file mode 100644 index 0000000..979b07a --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -0,0 +1,519 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.io.*; +import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.*; + +import org.apache.hadoop.mapred.lib.NullOutputFormat; +import org.apache.hadoop.util.StringUtils; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.net.URL; +import java.net.URLDecoder; +import java.util.*; + +/** + * Class to do compactions via an MR job. This has to be in the ql package rather than metastore + * .compactions package with all of it's relatives because it needs access to the actual input + * and output formats, which are in ql. ql depends on metastore and we can't have a circular + * dependency. + */ +public class CompactorMR { + + static final private String CLASS_NAME = CompactorMR.class.getName(); + static final private Log LOG = LogFactory.getLog(CLASS_NAME); + + static final private String INPUT_FORMAT_CLASS_NAME = "hive.compactor.input.format.class.name"; + static final private String OUTPUT_FORMAT_CLASS_NAME = "hive.compactor.output.format.class.name"; + static final private String LOCATION = "hive.compactor.input.dir"; + static final private String SERDE = "hive.compactor.serde"; + static final private String MIN_TXN = "hive.compactor.txn.min"; + static final private String MAX_TXN = "hive.compactor.txn.max"; + static final private String IS_MAJOR = "hive.compactor.is.major"; + static final private String IS_COMPRESSED = "hive.compactor.is.compressed"; + static final private String TABLE_PROPS = "hive.compactor.table.props"; + static final private String NUM_BUCKETS = "hive.compactor.num.buckets"; + static final private String BASE_DIR = "hive.compactor.base.dir"; + static final private String DELTA_DIRS = "hive.compactor.delta.dirs"; + static final private String DIRS_TO_SEARCH = "hive.compactor.dirs.to.search"; + + public CompactorMR() { + } + + /** + * Run a compactor job. + * @param conf Hive configuration file + * @param jobName name to run this job with + * @param t metastore table + * @param sd metastore storage descriptor + * @param txns list of valid transactions + * @param isMajor is this a major compaction? + */ + void run(HiveConf conf, String jobName, Table t, StorageDescriptor sd, + IMetaStoreClient.ValidTxnList txns, boolean isMajor) { + try { + JobConf job = new JobConf(conf); + job.setJobName(jobName); + job.setOutputKeyClass(NullWritable.class); + job.setOutputValueClass(NullWritable.class); + job.setJarByClass(CompactorMR.class); + LOG.debug("User jar set to " + job.getJar()); + job.setMapperClass(CompactorMap.class); + job.setNumReduceTasks(0); + job.setInputFormat(CompactorInputFormat.class); + job.setOutputFormat(NullOutputFormat.class); + + job.set(LOCATION, sd.getLocation()); + job.set(INPUT_FORMAT_CLASS_NAME, sd.getInputFormat()); + job.set(OUTPUT_FORMAT_CLASS_NAME, sd.getOutputFormat()); + job.set(SERDE, sd.getSerdeInfo().getName()); + job.setBoolean(IS_MAJOR, isMajor); + job.setBoolean(IS_COMPRESSED, sd.isCompressed()); + job.set(TABLE_PROPS, new StringableMap(t.getParameters()).toString()); + job.setInt(NUM_BUCKETS, sd.getBucketColsSize()); + + // Figure out and encode what files we need to read. We do this here (rather than in + // getSplits below) because as part of this we discover our minimum and maximum transactions, + // and discovering that in getSplits is too late as we then have no way to pass it to our + // mapper. + + AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(sd.getLocation()), conf, txns); + StringableList dirsToSearch = new StringableList(); + Path baseDir = null; + if (isMajor) { + // There may not be a base dir if the partition was empty before inserts or if this + // partition is just now being converted to ACID. + baseDir = dir.getBaseDirectory(); + if (baseDir == null) { + List originalFiles = dir.getOriginalFiles(); + if (!(originalFiles == null) && !(originalFiles.size() == 0)) { + // There are original format files + for (FileStatus stat : originalFiles) { + dirsToSearch.add(stat.getPath()); + LOG.debug("Adding original file " + stat.getPath().toString() + " to dirs to search"); + } + // Set base to the location so that the input format reads the original files. + baseDir = new Path(sd.getLocation()); + + /* + Path origDir = new Path(sd.getLocation()); + dirsToSearch.add(origDir); + LOG.debug("Adding original directory " + origDir + " to dirs to search"); + */ + } + } else { + // add our base to the list of directories to search for files in. + LOG.debug("Adding base directory " + baseDir + " to dirs to search"); + dirsToSearch.add(baseDir); + } + } + + List parsedDeltas = dir.getCurrentDirectories(); + + if (parsedDeltas == null || parsedDeltas.size() == 0) { + // Seriously, no deltas? Can't compact that. + LOG.error("No delta files found to compact in " + sd.getLocation()); + return; + } + + StringableList deltaDirs = new StringableList(); + long minTxn = Long.MAX_VALUE; + long maxTxn = Long.MIN_VALUE; + for (AcidUtils.ParsedDelta delta : parsedDeltas) { + LOG.debug("Adding delta " + delta.getPath() + " to directories to search"); + dirsToSearch.add(delta.getPath()); + deltaDirs.add(delta.getPath()); + minTxn = Math.min(minTxn, delta.getMinTransaction()); + maxTxn = Math.max(maxTxn, delta.getMaxTransaction()); + } + + if (baseDir != null) job.set(BASE_DIR, baseDir.toString()); + job.set(DELTA_DIRS, deltaDirs.toString()); + job.set(DIRS_TO_SEARCH, dirsToSearch.toString()); + job.setLong(MIN_TXN, minTxn); + job.setLong(MAX_TXN, maxTxn); + LOG.debug("Setting minimum transaction to " + minTxn); + LOG.debug("Setting maximume transaction to " + maxTxn); + + JobClient.runJob(job); + } catch (Throwable e) { + // Don't let anything past us. + LOG.error("Running MR job " + jobName + " to compact failed, " + + StringUtils.stringifyException(e)); + } + } + + static class CompactorInputSplit implements InputSplit { + private long length = 0; + private List locations; + private int bucketNum; + private Path base; + private Path[] deltas; + + public CompactorInputSplit() { + } + + /** + * + * @param hadoopConf + * @param bucket bucket to be processed by this split + * @param files actual files this split should process. It is assumed the caller has already + * parsed out the files in base and deltas to populate this list. + * @param base directory of the base, or the partition/table location if the files are in old + * style. Can be null. + * @param deltas directories of the delta files. + * @throws IOException + */ + CompactorInputSplit(Configuration hadoopConf, int bucket, List files, Path base, + Path[] deltas) + throws IOException { + bucketNum = bucket; + this.base = base; + this.deltas = deltas; + locations = new ArrayList(); + + for (Path path : files) { + FileSystem fs = path.getFileSystem(hadoopConf); + FileStatus stat = fs.getFileStatus(path); + length += stat.getLen(); + BlockLocation[] locs = fs.getFileBlockLocations(stat, 0, length); + for (int i = 0; i < locs.length; i++) { + String[] hosts = locs[i].getHosts(); + for (int j = 0; j < hosts.length; j++) { + locations.add(hosts[j]); + } + } + } + } + + @Override + public long getLength() throws IOException { + return length; + } + + @Override + public String[] getLocations() throws IOException { + return locations.toArray(new String[locations.size()]); + } + + @Override + public void write(DataOutput dataOutput) throws IOException { + dataOutput.writeLong(length); + dataOutput.writeInt(locations.size()); + for (int i = 0; i < locations.size(); i++) { + dataOutput.writeInt(locations.get(i).length()); + dataOutput.writeBytes(locations.get(i)); + } + dataOutput.writeInt(bucketNum); + if (base == null) { + dataOutput.writeInt(0); + } else { + dataOutput.writeInt(base.toString().length()); + dataOutput.writeBytes(base.toString()); + } + dataOutput.writeInt(deltas.length); + for (int i = 0; i < deltas.length; i++) { + dataOutput.writeInt(deltas[i].toString().length()); + dataOutput.writeBytes(deltas[i].toString()); + } + + } + + @Override + public void readFields(DataInput dataInput) throws IOException { + int len; + byte[] buf; + + locations = new ArrayList(); + length = dataInput.readLong(); + LOG.debug("Read length of " + length); + int numElements = dataInput.readInt(); + LOG.debug("Read numElements of " + numElements); + for (int i = 0; i < numElements; i++) { + len = dataInput.readInt(); + LOG.debug("Read file length of " + len); + buf = new byte[len]; + dataInput.readFully(buf); + locations.add(new String(buf)); + } + bucketNum = dataInput.readInt(); + LOG.debug("Read bucket number of " + bucketNum); + len = dataInput.readInt(); + LOG.debug("Read base path length of " + len); + if (len > 0) { + buf = new byte[len]; + dataInput.readFully(buf); + base = new Path(new String(buf)); + } + numElements = dataInput.readInt(); + deltas = new Path[numElements]; + for (int i = 0; i < numElements; i++) { + len = dataInput.readInt(); + buf = new byte[len]; + dataInput.readFully(buf); + deltas[i] = new Path(new String(buf)); + } + } + + int getBucket() { + return bucketNum; + } + + Path getBaseDir() { + return base; + } + + Path[] getDeltaDirs() { + return deltas; + } + } + + static class CompactorInputFormat + implements InputFormat { + + @Override + public InputSplit[] getSplits(JobConf entries, int i) throws IOException { + Path baseDir = null; + if (entries.get(BASE_DIR) != null) baseDir = new Path(entries.get(BASE_DIR)); + StringableList deltaDirs = new StringableList(entries.get(DELTA_DIRS)); + StringableList dirsToSearch = new StringableList(entries.get(DIRS_TO_SEARCH)); + + InputSplit[] splits = new InputSplit[entries.getInt(NUM_BUCKETS, 1)]; + for (int bucket = 0; bucket < splits.length; bucket++) { + + // Go find the actual files to read. This will change for each split. + List filesToRead = new ArrayList(); + for (Path d : dirsToSearch) { + // If this is a base or delta directory, then we need to be looking for a bucket file. + // But if it's a legacy file then we need to add it directly. + if (d.getName().startsWith(AcidUtils.BASE_PREFIX) || + d.getName().startsWith(AcidUtils.DELTA_PREFIX)) { + Path bucketFile = AcidUtils.createBucketFile(d, bucket); + filesToRead.add(bucketFile); + LOG.debug("Adding " + bucketFile.toString() + " to files to read"); + } else { + filesToRead.add(d); + LOG.debug("Adding " + d + " to files to read"); + } + } + splits[bucket] = new CompactorInputSplit(entries, bucket, filesToRead, baseDir, + deltaDirs.toArray(new Path[deltaDirs.size()])); + } + LOG.debug("Returning " + splits.length + " splits"); + return splits; + } + + @Override + public RecordReader getRecordReader(InputSplit inputSplit, JobConf entries, + Reporter reporter) throws IOException { + CompactorInputSplit split = (CompactorInputSplit)inputSplit; + AcidInputFormat aif = + instantiate(AcidInputFormat.class, entries.get(INPUT_FORMAT_CLASS_NAME)); + return aif.getRawReader(entries, entries.getBoolean(IS_MAJOR, false), split.getBucket(), + split.getBaseDir(), split.getDeltaDirs()); + } + } + + static class CompactorMap + implements Mapper { + + JobConf jobConf; + FSRecordWriter writer; + + @Override + public void map(RecordIdentifier identifier, V value, + OutputCollector nullWritableVOutputCollector, + Reporter reporter) throws IOException { + // After all this setup there's actually almost nothing to do. + getWriter(reporter, identifier.getBucketId()); // Make sure we've opened the writer + writer.write(value); + } + + @Override + public void close() throws IOException { + if (writer != null) writer.close(false); + } + + @Override + public void configure(JobConf entries) { + jobConf = entries; + } + + private void getWriter(Reporter reporter, int bucket) throws IOException { + if (writer == null) { + AbstractSerDe serde = instantiate(AbstractSerDe.class, jobConf.get(SERDE)); + ObjectInspector inspector; + try { + inspector = serde.getObjectInspector(); + } catch (SerDeException e) { + LOG.error("Unable to get object inspector, " + StringUtils.stringifyException(e)); + throw new IOException(e); + } + + AcidOutputFormat.Options options = new AcidOutputFormat.Options(jobConf); + options.inspector(inspector) + .writingBase(jobConf.getBoolean(IS_MAJOR, false)) + .isCompressed(jobConf.getBoolean(IS_COMPRESSED, false)) + .tableProperties(new StringableMap(jobConf.get(TABLE_PROPS)).toProperties()) + .reporter(reporter) + .minimumTransactionId(jobConf.getLong(MIN_TXN, Long.MAX_VALUE)) + .maximumTransactionId(jobConf.getLong(MAX_TXN, Long.MIN_VALUE)) + .bucket(bucket); + + // Instantiate the underlying output format + AcidOutputFormat aof = + instantiate(AcidOutputFormat.class, jobConf.get(OUTPUT_FORMAT_CLASS_NAME)); + + Path location = AcidUtils.createFilename(new Path(jobConf.get(LOCATION)), options); + writer = aof.getRawRecordWriter(location, options); + } + } + } + + static class StringableMap extends HashMap { + + StringableMap(String s) { + String[] parts = s.split(":", 2); + // read that many chars + int numElements = Integer.valueOf(parts[0]); + s = parts[1]; + for (int i = 0; i < numElements; i++) { + parts = s.split(":", 2); + int len = Integer.valueOf(parts[0]); + String key = null; + if (len > 0) key = parts[1].substring(0, len); + parts = parts[1].substring(len).split(":", 2); + len = Integer.valueOf(parts[0]); + String value = null; + if (len > 0) value = parts[1].substring(0, len); + s = parts[1].substring(len); + put(key, value); + } + } + + StringableMap(Map m) { + super(m); + } + + @Override + public String toString() { + StringBuffer buf = new StringBuffer(); + buf.append(size()); + buf.append(':'); + if (size() > 0) { + for (Map.Entry entry : entrySet()) { + int length = (entry.getKey() == null) ? 0 : entry.getKey().length(); + buf.append(entry.getKey() == null ? 0 : length); + buf.append(':'); + if (length > 0) buf.append(entry.getKey()); + length = (entry.getValue() == null) ? 0 : entry.getValue().length(); + buf.append(length); + buf.append(':'); + if (length > 0) buf.append(entry.getValue()); + } + } + return buf.toString(); + } + + public Properties toProperties() { + Properties props = new Properties(); + props.putAll(this); + return props; + } + } + + static class StringableList extends ArrayList { + StringableList() { + + } + + StringableList(String s) { + String[] parts = s.split(":", 2); + // read that many chars + int numElements = Integer.valueOf(parts[0]); + s = parts[1]; + for (int i = 0; i < numElements; i++) { + parts = s.split(":", 2); + int len = Integer.valueOf(parts[0]); + String val = parts[1].substring(0, len); + s = parts[1].substring(len); + add(new Path(val)); + } + } + + @Override + public String toString() { + StringBuffer buf = new StringBuffer(); + buf.append(size()); + buf.append(':'); + if (size() > 0) { + for (Path p : this) { + buf.append(p.toString().length()); + buf.append(':'); + buf.append(p.toString()); + } + } + return buf.toString(); + } + } + + private static T instantiate(Class classType, String classname) throws IOException { + T t = null; + try { + Class c = Class.forName(classname); + Object o = c.newInstance(); + if (classType.isAssignableFrom(o.getClass())) { + t = (T)o; + } else { + String s = classname + " is not an instance of " + classType.getName(); + LOG.error(s); + throw new IOException(s); + } + } catch (ClassNotFoundException e) { + LOG.error("Unable to instantiate class, " + StringUtils.stringifyException(e)); + throw new IOException(e); + } catch (InstantiationException e) { + LOG.error("Unable to instantiate class, " + StringUtils.stringifyException(e)); + throw new IOException(e); + } catch (IllegalAccessException e) { + LOG.error("Unable to instantiate class, " + StringUtils.stringifyException(e)); + throw new IOException(e); + } + return t; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java new file mode 100644 index 0000000..cdb2660 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.MetaStoreThread; +import org.apache.hadoop.hive.metastore.RawStore; +import org.apache.hadoop.hive.metastore.RetryingRawStore; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.txn.CompactionInfo; +import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler; +import org.apache.hadoop.hive.metastore.txn.TxnHandler; +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.UserGroupInformation; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.List; + +/** + * Superclass for all threads in the compactor. + */ +abstract class CompactorThread extends Thread implements MetaStoreThread { + static final private String CLASS_NAME = CompactorThread.class.getName(); + static final private Log LOG = LogFactory.getLog(CLASS_NAME); + + protected HiveConf conf; + protected CompactionTxnHandler txnHandler; + protected RawStore rs; + protected int threadId; + protected BooleanPointer stop; + + @Override + public void setHiveConf(HiveConf conf) { + this.conf = conf; + } + + @Override + public void setThreadId(int threadId) { + this.threadId = threadId; + + } + + @Override + public void init(BooleanPointer stop) throws MetaException { + this.stop = stop; + setPriority(MIN_PRIORITY); + setDaemon(true); // this means the process will exit without waiting for this thread + + // Get our own instance of the transaction handler + txnHandler = new CompactionTxnHandler(conf); + + // Get our own connection to the database so we can get table and partition information. + rs = RetryingRawStore.getProxy(conf, conf, + conf.getVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL), threadId); + } + + /** + * Find the table being compacted + * @param ci compaction info returned from the compaction queue + * @return metastore table + * @throws org.apache.hadoop.hive.metastore.api.MetaException if the table cannot be found. + */ + protected Table resolveTable(CompactionInfo ci) throws MetaException { + try { + return rs.getTable(ci.dbname, ci.tableName); + } catch (MetaException e) { + LOG.error("Unable to find table " + ci.getFullTableName() + ", " + e.getMessage()); + throw e; + } + } + + /** + * Get the partition being compacted. + * @param ci compaction info returned from the compaction queue + * @return metastore partition, or null if there is not partition in this compaction info + * @throws Exception if underlying calls throw, or if the partition name resolves to more than + * one partition. + */ + protected Partition resolvePartition(CompactionInfo ci) throws Exception { + Partition p = null; + if (ci.partName != null) { + List names = new ArrayList(1); + names.add(ci.partName); + List parts = null; + try { + parts = rs.getPartitionsByNames(ci.dbname, ci.tableName, names); + } catch (Exception e) { + LOG.error("Unable to find partition " + ci.getFullPartitionName() + ", " + e.getMessage()); + throw e; + } + if (parts.size() != 1) { + LOG.error(ci.getFullPartitionName() + " does not refer to a single partition"); + throw new MetaException("Too many partitions"); + } + return parts.get(0); + } else { + return null; + } + } + + /** + * Get the storage descriptor for a compaction. + * @param t table from {@link #resolveTable(org.apache.hadoop.hive.metastore.txn.CompactionInfo)} + * @param p table from {@link #resolvePartition(org.apache.hadoop.hive.metastore.txn.CompactionInfo)} + * @return metastore storage descriptor. + */ + protected StorageDescriptor resolveStorageDescriptor(Table t, Partition p) { + return (p == null) ? t.getSd() : p.getSd(); + } + + /** + * Determine which user to run an operation as, based on the owner of the directory to be + * compacted. It is asserted that either the user running the hive metastore or the table + * owner must be able to stat the directory and determine the owner. + * @param location directory that will be read or written to. + * @param t metastore table object + * @return username of the owner of the location. + * @throws java.io.IOException if neither the hive metastore user nor the table owner can stat + * the location. + */ + protected String findUserToRunAs(String location, Table t) throws IOException, + InterruptedException { + LOG.debug("Determining who to run the job as."); + final Path p = new Path(location); + final FileSystem fs = p.getFileSystem(conf); + try { + FileStatus stat = fs.getFileStatus(p); + LOG.debug("Running job as " + stat.getOwner()); + return stat.getOwner(); + } catch (AccessControlException e) { + // TODO not sure this is the right exception + LOG.debug("Unable to stat file as current user, trying as table owner"); + + // Now, try it as the table owner and see if we get better luck. + final List wrapper = new ArrayList(1); + UserGroupInformation ugi = UserGroupInformation.createProxyUser(t.getOwner(), + UserGroupInformation.getLoginUser()); + ugi.doAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws Exception { + FileStatus stat = fs.getFileStatus(p); + wrapper.add(stat.getOwner()); + return null; + } + }); + + if (wrapper.size() == 1) { + LOG.debug("Running job as " + wrapper.get(0)); + return wrapper.get(0); + } + } + LOG.error("Unable to stat file as either current user or table owner, giving up"); + throw new IOException("Unable to stat file"); + } + + /** + * Determine whether to run this job as the current user or whether we need a doAs to switch + * users. + * @param owner of the directory we will be working in, as determined by + * {@link #findUserToRunAs(String, org.apache.hadoop.hive.metastore.api.Table)} + * @return true if the job should run as the current user, false if a doAs is needed. + */ + protected boolean runJobAsSelf(String owner) { + return (owner.equals(System.getProperty("user.name"))); + } + + protected String tableName(Table t) { + return t.getDbName() + "." + t.getTableName(); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java new file mode 100644 index 0000000..33169d4 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -0,0 +1,290 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.hive.metastore.txn.CompactionInfo; +import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler; +import org.apache.hadoop.hive.metastore.txn.TxnHandler; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.StringUtils; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.List; + +/** + * A class to initiate compactions. This will run in a separate thread. + */ +public class Initiator extends CompactorThread { + static final private String CLASS_NAME = Initiator.class.getName(); + static final private Log LOG = LogFactory.getLog(CLASS_NAME); + static final private int threadId = 10000; + + static final private String NO_COMPACTION = "NO_AUTO_COMPACTION"; + + private long checkInterval; + + @Override + public void run() { + // Make sure nothing escapes this run method and kills the metastore at large, + // so wrap it in a big catch Throwable statement. + try { + recoverFailedCompactions(false); + + int abortedThreashold = HiveConf.getIntVar(conf, + HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD); + + while (!stop.timeToStop) { + long startedAt = System.currentTimeMillis(); + + // Wrap the inner parts of the loop in a catch throwable so that any errors in the loop + // don't doom the entire thread. + try { + ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest()); + GetOpenTxnsResponse openTxns = txnHandler.getOpenTxns(); + IMetaStoreClient.ValidTxnList txns = new HiveMetaStoreClient.ValidTxnListImpl(openTxns); + List potentials = txnHandler.findPotentialCompactions(abortedThreashold); + LOG.debug("Found " + potentials.size() + " potential compactions, " + + "checking to see if we should compact any of them"); + for (CompactionInfo ci : potentials) { + LOG.debug("Checking to see if we should compact " + ci.getFullPartitionName()); + try { + Table t = resolveTable(ci); + // check if no compaction set for this table + if (t.getParameters().get(NO_COMPACTION) != null) { + LOG.info("Table " + tableName(t) + " marked " + NO_COMPACTION + + " so we will not compact it."); + continue; + } + + // Check if we already have initiated or are working on a compaction for this partition + // or table. If so, skip it. If we are just waiting on cleaning we can still check, + // as it may be time to compact again even though we haven't cleaned. + if (lookForCurrentCompactions(currentCompactions, ci)) { + LOG.debug("Found currently initiated or working compaction for " + + ci.getFullPartitionName() + " so we will not initiate another compaction"); + continue; + } + + // Figure out who we should run the file operations as + Partition p = resolvePartition(ci); + StorageDescriptor sd = resolveStorageDescriptor(t, p); + String runAs = findUserToRunAs(sd.getLocation(), t); + + if (checkForMajor(ci, txns, sd, runAs)) { + requestCompaction(ci, runAs, CompactionType.MAJOR); + } else if (checkForMinor(ci, txns, sd, runAs)) { + requestCompaction(ci, runAs, CompactionType.MINOR); + } + } catch (Throwable t) { + LOG.error("Caught exception while trying to determine if we should compact " + + ci.getFullPartitionName() + ". Marking clean to avoid repeated failures, " + + "" + StringUtils.stringifyException(t)); + txnHandler.markCleaned(ci); + } + } + + // Check for timed out remote workers. + recoverFailedCompactions(true); + + // Clean anything from the txns table that has no components left in txn_components. + txnHandler.cleanEmptyAbortedTxns(); + } catch (Throwable t) { + LOG.error("Initiator loop caught unexpected exception this time through the loop: " + + StringUtils.stringifyException(t)); + } + + long elapsedTime = System.currentTimeMillis() - startedAt; + if (elapsedTime >= checkInterval) continue; + else Thread.sleep(checkInterval - elapsedTime); + + } + } catch (Throwable t) { + LOG.error("Caught an exception in the main loop of compactor initiator, exiting " + + StringUtils.stringifyException(t)); + } + } + + @Override + public void init(BooleanPointer stop) throws MetaException { + super.init(stop); + checkInterval = HiveConf.getLongVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_CHECK_INTERVAL); + } + + private void recoverFailedCompactions(boolean remoteOnly) { + if (!remoteOnly) txnHandler.revokeFromLocalWorkers(Worker.hostname()); + txnHandler.revokeTimedoutWorkers(HiveConf.getLongVar(conf, + HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT)); + } + + // Figure out if there are any currently running compactions on the same table or partition. + private boolean lookForCurrentCompactions(ShowCompactResponse compactions, + CompactionInfo ci) { + if (compactions.getCompacts() != null) { + for (ShowCompactResponseElement e : compactions.getCompacts()) { + if (e.getState() != TxnHandler.CLEANING_RESPONSE && + e.getDbname().equals(ci.dbname) && + e.getTablename().equals(ci.tableName) && + (e.getPartitionname() == null && ci.partName == null || + e.getPartitionname().equals(ci.partName))) { + return true; + } + } + } + return false; + } + + private boolean checkForMajor(final CompactionInfo ci, + final IMetaStoreClient.ValidTxnList txns, + final StorageDescriptor sd, + String runAs) throws IOException, InterruptedException { + // If it's marked as too many aborted, we already know we need to compact + if (ci.tooManyAborts) { + LOG.debug("Found too many aborted transactions for " + ci.getFullPartitionName() + ", " + + "initiating major compaction"); + return true; + } + if (runJobAsSelf(runAs)) { + return deltaSizeBigEnough(ci, txns, sd); + } else { + final List hackery = new ArrayList(1); + UserGroupInformation ugi = UserGroupInformation.createProxyUser(ci.runAs, + UserGroupInformation.getLoginUser()); + ugi.doAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws Exception { + hackery.add(deltaSizeBigEnough(ci, txns, sd)); + return null; + } + }); + return hackery.get(0); + } + } + + private boolean deltaSizeBigEnough(CompactionInfo ci, + IMetaStoreClient.ValidTxnList txns, + StorageDescriptor sd) throws IOException { + Path location = new Path(sd.getLocation()); + FileSystem fs = location.getFileSystem(conf); + AcidUtils.Directory dir = AcidUtils.getAcidState(location, conf, txns); + Path base = dir.getBaseDirectory(); + FileStatus stat = fs.getFileStatus(base); + if (!stat.isDir()) { + LOG.error("Was assuming base " + base.toString() + " is directory, but it's a file!"); + return false; + } + long baseSize = sumDirSize(fs, base); + + List originals = dir.getOriginalFiles(); + for (FileStatus origStat : originals) { + baseSize += origStat.getLen(); + } + + long deltaSize = 0; + List deltas = dir.getCurrentDirectories(); + for (AcidUtils.ParsedDelta delta : deltas) { + stat = fs.getFileStatus(delta.getPath()); + if (!stat.isDir()) { + LOG.error("Was assuming delta " + delta.getPath().toString() + " is a directory, " + + "but it's a file!"); + return false; + } + deltaSize += sumDirSize(fs, delta.getPath()); + } + + float threshold = HiveConf.getFloatVar(conf, + HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD); + boolean bigEnough = (float)deltaSize/(float)baseSize > threshold; + if (LOG.isDebugEnabled()) { + StringBuffer msg = new StringBuffer("delta size: "); + msg.append(deltaSize); + msg.append(" base size: "); + msg.append(baseSize); + msg.append(" threshold: "); + msg.append(threshold); + msg.append(" will major compact: " + bigEnough); + LOG.debug(msg); + } + return bigEnough; + } + + private long sumDirSize(FileSystem fs, Path dir) throws IOException { + long size = 0; + FileStatus[] buckets = fs.listStatus(dir); + for (int i = 0; i < buckets.length; i++) { + size += buckets[i].getLen(); + } + return size; + } + + private boolean checkForMinor(final CompactionInfo ci, + final IMetaStoreClient.ValidTxnList txns, + final StorageDescriptor sd, + String runAs) throws IOException, InterruptedException { + + if (runJobAsSelf(runAs)) { + return tooManyDeltas(ci, txns, sd); + } else { + final List hackery = new ArrayList(1); + UserGroupInformation ugi = UserGroupInformation.createProxyUser(ci.runAs, + UserGroupInformation.getLoginUser()); + ugi.doAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws Exception { + hackery.add(tooManyDeltas(ci, txns, sd)); + return null; + } + }); + return hackery.get(0); + } + } + + private boolean tooManyDeltas(CompactionInfo ci, IMetaStoreClient.ValidTxnList txns, + StorageDescriptor sd) throws IOException { + Path location = new Path(sd.getLocation()); + FileSystem fs = location.getFileSystem(conf); + + AcidUtils.Directory dir = AcidUtils.getAcidState(location, conf, txns); + List deltas = dir.getCurrentDirectories(); + int threshold = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD); + boolean enough = deltas.size() > threshold; + LOG.debug("Found " + deltas.size() + " delta files, threshold is " + threshold + + (enough ? "" : "not") + " requesting minor compaction"); + return enough; + } + + private void requestCompaction(CompactionInfo ci, String runAs, CompactionType type) { + String s = "Requesting " + type.toString() + " compaction for " + ci.getFullPartitionName(); + LOG.info(s); + CompactionRequest rqst = new CompactionRequest(ci.dbname, ci.tableName, type); + if (ci.partName != null) rqst.setPartitionname(ci.partName); + rqst.setRunas(runAs); + txnHandler.compact(rqst); + } +} \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java new file mode 100644 index 0000000..1e5797c --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.hive.metastore.txn.CompactionInfo; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.StringUtils; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.security.PrivilegedAction; + +/** + * A class to do compactions. This will run in a separate thread. It will spin on the + * compaction queue and look for new work to do. + */ +public class Worker extends CompactorThread { + static final private String CLASS_NAME = Worker.class.getName(); + static final private Log LOG = LogFactory.getLog(CLASS_NAME); + static final private long SLEEP_TIME = 5000; + static final private int baseThreadNum = 10002; + + private String name; + + /** + * Get the hostname that this worker is run on. Made static and public so that other classes + * can use the same method to know what host their worker threads are running on. + * @return hostname + */ + public static String hostname() { + try { + return InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + LOG.error("Unable to resolve my host name " + e.getMessage()); + throw new RuntimeException(e); + } + } + + @Override + public void run() { + // Make sure nothing escapes this run method and kills the metastore at large, + // so wrap it in a big catch Throwable statement. + try { + while (!stop.timeToStop) { + CompactionInfo ci = txnHandler.findNextToCompact(name); + + if (ci == null) { + try { + Thread.sleep(SLEEP_TIME); + continue; + } catch (InterruptedException e) { + LOG.warn("Worker thread sleep interrupted " + e.getMessage()); + continue; + } + } + + // Find the table we will be working with. + Table t1 = null; + try { + t1 = resolveTable(ci); + } catch (MetaException e) { + txnHandler.markCleaned(ci); + continue; + } + // This chicanery is to get around the fact that the table needs to be final in order to + // go into the doAs below. + final Table t = t1; + + // Find the partition we will be working with, if there is one. + Partition p = null; + try { + p = resolvePartition(ci); + } catch (Exception e) { + txnHandler.markCleaned(ci); + continue; + } + + // Find the appropriate storage descriptor + final StorageDescriptor sd = resolveStorageDescriptor(t, p); + + // Check that the table or partition isn't sorted, as we don't yet support that. + if (sd.getSortCols() != null && !sd.getSortCols().isEmpty()) { + LOG.error("Attempt to compact sorted table, which is not yet supported!"); + txnHandler.markCleaned(ci); + continue; + } + + final boolean isMajor = (ci.type == CompactionType.MAJOR); + final IMetaStoreClient.ValidTxnList txns = + new HiveMetaStoreClient.ValidTxnListImpl(txnHandler.getOpenTxns()); + final StringBuffer jobName = new StringBuffer(name); + jobName.append("-compactor-"); + jobName.append(ci.getFullPartitionName()); + + // Determine who to run as + String runAs; + if (ci.runAs == null) { + runAs = findUserToRunAs(sd.getLocation(), t); + txnHandler.setRunAs(ci.id, runAs); + } else { + runAs = ci.runAs; + } + + LOG.info("Starting " + ci.type.toString() + " compaction for " + + ci.getFullPartitionName()); + + final CompactorMR mr = new CompactorMR(); + if (runJobAsSelf(runAs)) { + mr.run(conf, jobName.toString(), t, sd, txns, isMajor); + } else { + UserGroupInformation ugi = UserGroupInformation.createProxyUser(t.getOwner(), + UserGroupInformation.getLoginUser()); + ugi.doAs(new PrivilegedAction() { + @Override + public Object run() { + mr.run(conf, jobName.toString(), t, sd, txns, isMajor); + return null; + } + }); + } + } + } catch (Throwable t) { + LOG.error("Caught an exception in the main loop of compactor worker " + name + + ", exiting " + StringUtils.stringifyException(t)); + } + + } + + @Override + public void init(BooleanPointer stop) throws MetaException { + super.init(stop); + + StringBuffer name = new StringBuffer(hostname()); + name.append("-"); + name.append(getId()); + this.name = name.toString(); + setName(name.toString()); + } + +} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java new file mode 100644 index 0000000..2a91579 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java @@ -0,0 +1,423 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.MetaStoreThread; +import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler; +import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.io.*; +import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.*; +import org.apache.hadoop.util.Progressable; +import org.apache.thrift.TException; + +import java.io.File; +import java.io.IOException; +import java.util.*; + +/** + * Super class for all of the compactor test modules. + */ +public abstract class CompactorTest { + static final private String CLASS_NAME = CompactorTest.class.getName(); + static final private Log LOG = LogFactory.getLog(CLASS_NAME); + + protected CompactionTxnHandler txnHandler; + protected IMetaStoreClient ms; + protected long sleepTime = 1000; + + private List threads = new ArrayList(); + private MetaStoreThread.BooleanPointer stop = new MetaStoreThread.BooleanPointer(); + private File tmpdir; + + protected CompactorTest() throws Exception { + HiveConf conf = new HiveConf(); + TxnDbUtil.setConfValues(conf); + TxnDbUtil.cleanDb(); + ms = new HiveMetaStoreClient(conf); + txnHandler = new CompactionTxnHandler(conf); + tmpdir = new File(System.getProperty("java.io.tmpdir") + + System.getProperty("file.separator") + "compactor_test_tables"); + tmpdir.mkdir(); + tmpdir.deleteOnExit(); + } + + protected void startInitiator(HiveConf conf) throws Exception { + startThread('i', conf); + } + + protected void startWorker(HiveConf conf) throws Exception { + startThread('w', conf); + } + + protected void startCleaner(HiveConf conf) throws Exception { + startThread('c', conf); + } + + protected void stopThreads() throws Exception { + stop.timeToStop = true; + for (Thread t : threads) t.interrupt(); + } + + protected void joinThreads() throws Exception { + stop.timeToStop = true; + for (Thread t : threads) t.join(); + } + + protected Table newTable(String dbName, String tableName, boolean partitioned) throws TException { + return newTable(dbName, tableName, partitioned, new HashMap(), null); + } + + protected Table newTable(String dbName, String tableName, boolean partitioned, + Map parameters) throws TException { + return newTable(dbName, tableName, partitioned, parameters, null); + + } + + protected Table newTable(String dbName, String tableName, boolean partitioned, + Map parameters, List sortCols) + throws TException { + Table table = new Table(); + table.setTableName(tableName); + table.setDbName(dbName); + table.setOwner("me"); + table.setSd(newStorageDescriptor(getLocation(tableName, null), sortCols)); + List partKeys = new ArrayList(1); + if (partitioned) { + partKeys.add(new FieldSchema("ds", "string", "no comment")); + table.setPartitionKeys(partKeys); + } + + table.setParameters(parameters); + + ms.createTable(table); + return table; + } + + protected Partition newPartition(Table t, String value) throws Exception { + return newPartition(t, value, null); + } + + protected Partition newPartition(Table t, String value, List sortCols) throws Exception { + Partition part = new Partition(); + part.addToValues(value); + part.setDbName(t.getDbName()); + part.setTableName(t.getTableName()); + part.setSd(newStorageDescriptor(getLocation(t.getTableName(), value), sortCols)); + part.setParameters(new HashMap()); + ms.add_partition(part); + return part; + } + + protected long openTxn() { + List txns = txnHandler.openTxns(new OpenTxnRequest(1, System.getProperty("user.name"), + Worker.hostname())).getTxn_ids(); + return txns.get(0); + } + + protected void addDeltaFile(HiveConf conf, Table t, Partition p, long minTxn, long maxTxn, + int numRecords) throws Exception{ + addFile(conf, t, p, minTxn, maxTxn, numRecords, FileType.DELTA); + } + + protected void addBaseFile(HiveConf conf, Table t, Partition p, long maxTxn, + int numRecords) throws Exception{ + addFile(conf, t, p, 0, maxTxn, numRecords, FileType.BASE); + } + + protected void addLegacyFile(HiveConf conf, Table t, Partition p, + int numRecords) throws Exception { + addFile(conf, t, p, 0, 0, numRecords, FileType.LEGACY); + } + + protected List getDirectories(HiveConf conf, Table t, Partition p) throws Exception { + String partValue = (p == null) ? null : p.getValues().get(0); + String location = getLocation(t.getTableName(), partValue); + Path dir = new Path(location); + FileSystem fs = FileSystem.get(conf); + FileStatus[] stats = fs.listStatus(dir); + List paths = new ArrayList(stats.length); + for (int i = 0; i < stats.length; i++) paths.add(stats[i].getPath()); + return paths; + } + + protected void burnThroughTransactions(int num) throws NoSuchTxnException, TxnAbortedException { + OpenTxnsResponse rsp = txnHandler.openTxns(new OpenTxnRequest(num, "me", "localhost")); + for (long tid : rsp.getTxn_ids()) txnHandler.commitTxn(new CommitTxnRequest(tid)); + } + + private StorageDescriptor newStorageDescriptor(String location, List sortCols) { + StorageDescriptor sd = new StorageDescriptor(); + List cols = new ArrayList(2); + cols.add(new FieldSchema("a", "varchar(25)", "still no comment")); + cols.add(new FieldSchema("b", "int", "comment")); + sd.setCols(cols); + sd.setLocation(location); + sd.setInputFormat(MockInputFormat.class.getName()); + sd.setOutputFormat(MockOutputFormat.class.getName()); + sd.setNumBuckets(1); + SerDeInfo serde = new SerDeInfo(); + serde.setName(LazySimpleSerDe.class.getName()); + sd.setSerdeInfo(serde); + List bucketCols = new ArrayList(1); + bucketCols.add("a"); + sd.setBucketCols(bucketCols); + + if (sortCols != null) { + sd.setSortCols(sortCols); + } + return sd; + } + + // I can't do this with @Before because I want to be able to control the config file provided + // to each test. + private void startThread(char type, HiveConf conf) throws Exception { + TxnDbUtil.setConfValues(conf); + CompactorThread t = null; + switch (type) { + case 'i': t = new Initiator(); break; + case 'w': t = new Worker(); break; + case 'c': t = new Cleaner(); break; + default: throw new RuntimeException("Huh? Unknown thread type."); + } + threads.add(t); + t.setThreadId((int) t.getId()); + t.setHiveConf(conf); + t.init(stop); + t.start(); + } + + private String getLocation(String tableName, String partValue) { + String location = tmpdir.getAbsolutePath() + + System.getProperty("file.separator") + tableName; + if (partValue != null) { + location += System.getProperty("file.separator") + "ds=" + partValue; + } + return location; + } + + private enum FileType {BASE, DELTA, LEGACY}; + + private void addFile(HiveConf conf, Table t, Partition p, long minTxn, long maxTxn, + int numRecords, FileType type) throws Exception { + String partValue = (p == null) ? null : p.getValues().get(0); + Path location = new Path(getLocation(t.getTableName(), partValue)); + String filename = null; + switch (type) { + case BASE: filename = "base_" + maxTxn; break; + case DELTA: filename = "delta_" + minTxn + "_" + maxTxn; break; + case LEGACY: filename = "00000_0"; + } + + FileSystem fs = FileSystem.get(conf); + Path partFile = null; + if (type == FileType.LEGACY) { + partFile = new Path(location, filename); + } else { + Path dir = new Path(location, filename); + fs.mkdirs(dir); + partFile = new Path(dir, "bucket_00000"); + } + FSDataOutputStream out = fs.create(partFile); + for (int i = 0; i < numRecords; i++) { + out.writeBytes("mary had a little lamb its fleece was white as snow\n"); + } + out.close(); + } + + static class MockInputFormat implements AcidInputFormat { + + @Override + public RowReader getReader(InputSplit split, + Options options) throws + IOException { + return null; + } + + @Override + public RawReader getRawReader(Configuration conf, boolean collapseEvents, int bucket, + Path baseDirectory, Path... deltaDirectory) throws IOException { + + List filesToRead = new ArrayList(); + if (baseDirectory != null) { + if (baseDirectory.getName().startsWith(AcidUtils.BASE_PREFIX)) { + filesToRead.add(AcidUtils.createBucketFile(baseDirectory, bucket)); + } else { + filesToRead.add(new Path(baseDirectory, "00000_0")); + + } + } + for (int i = 0; i < deltaDirectory.length; i++) { + filesToRead.add(AcidUtils.createBucketFile(deltaDirectory[i], bucket)); + } + return new MockRawReader(conf, filesToRead); + } + + @Override + public InputSplit[] getSplits(JobConf entries, int i) throws IOException { + return new InputSplit[0]; + } + + @Override + public RecordReader getRecordReader(InputSplit inputSplit, JobConf entries, + Reporter reporter) throws IOException { + return null; + } + + @Override + public boolean validateInput(FileSystem fs, HiveConf conf, ArrayList files) throws + IOException { + return false; + } + } + + static class MockRawReader implements AcidInputFormat.RawReader { + private Stack filesToRead; + private Configuration conf; + private FSDataInputStream is = null; + private FileSystem fs; + + MockRawReader(Configuration conf, List files) throws IOException { + filesToRead = new Stack(); + for (Path file : files) filesToRead.push(file); + this.conf = conf; + fs = FileSystem.get(conf); + } + + @Override + public ObjectInspector getObjectInspector() { + return null; + } + + @Override + public boolean next(RecordIdentifier identifier, Text text) throws IOException { + if (is == null) { + // Open the next file + if (filesToRead.empty()) return false; + Path p = filesToRead.pop(); + LOG.debug("Reading records from " + p.toString()); + is = fs.open(p); + } + String line = is.readLine(); + if (line == null) { + // Set our current entry to null (since it's done) and try again. + is = null; + return next(identifier, text); + } + text.set(line); + return true; + } + + @Override + public RecordIdentifier createKey() { + return new RecordIdentifier(); + } + + @Override + public Text createValue() { + return new Text(); + } + + @Override + public long getPos() throws IOException { + return 0; + } + + @Override + public void close() throws IOException { + + } + + @Override + public float getProgress() throws IOException { + return 0; + } + } + + static class MockOutputFormat implements AcidOutputFormat { + + @Override + public RecordUpdater getRecordUpdater(Path path, Options options) throws + IOException { + return null; + } + + @Override + public FSRecordWriter getRawRecordWriter(Path path, Options options) throws IOException { + return new MockRecordWriter(path, options); + } + + @Override + public FSRecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath, + Class valueClass, + boolean isCompressed, Properties tableProperties, + Progressable progress) throws IOException { + return null; + } + + @Override + public RecordWriter getRecordWriter(FileSystem fileSystem, JobConf entries, + String s, + Progressable progressable) throws + IOException { + return null; + } + + @Override + public void checkOutputSpecs(FileSystem fileSystem, JobConf entries) throws IOException { + + } + } + + static class MockRecordWriter implements FSRecordWriter { + private FSDataOutputStream os; + + MockRecordWriter(Path basedir, AcidOutputFormat.Options options) throws IOException { + FileSystem fs = FileSystem.get(options.getConfiguration()); + os = fs.create(basedir); + } + + @Override + public void write(Writable w) throws IOException { + Text t = (Text)w; + os.writeBytes(t.toString()); + os.writeBytes("\n"); + } + + @Override + public void close(boolean abort) throws IOException { + os.close(); + } + } + + +} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java new file mode 100644 index 0000000..b55805a --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java @@ -0,0 +1,291 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import junit.framework.Assert; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.hive.metastore.txn.CompactionInfo; +import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +/** + * Tests for the compactor Cleaner thread + */ +public class TestCleaner extends CompactorTest { + public TestCleaner() throws Exception { + super(); + } + + @Test + public void nothing() throws Exception { + // Test that the whole things works when there's nothing in the queue. This is just a + // survival test. + startCleaner(new HiveConf()); + Thread.sleep(sleepTime); // should be long enough to get through the loop + stopThreads(); + } + + @Test + public void cleanupAfterMajorTableCompaction() throws Exception { + Table t = newTable("default", "camtc", false); + + HiveConf conf = new HiveConf(); + + addBaseFile(conf, t, null, 20L, 20); + addDeltaFile(conf, t, null, 21L, 22L, 2); + addDeltaFile(conf, t, null, 23L, 24L, 2); + addBaseFile(conf, t, null, 25L, 25); + + burnThroughTransactions(25); + + CompactionRequest rqst = new CompactionRequest("default", "camtc", CompactionType.MAJOR); + txnHandler.compact(rqst); + CompactionInfo ci = txnHandler.findNextToCompact("fred"); + txnHandler.markCompacted(ci); + txnHandler.setRunAs(ci.id, System.getProperty("user.name")); + + startCleaner(conf); + Thread.sleep(sleepTime); // should be long enough to get through the loop + + // Check there are no compactions requests left. + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertNull(rsp.getCompacts()); + + // Check that the files are removed + List paths = getDirectories(conf, t, null); + Assert.assertEquals(1, paths.size()); + Assert.assertEquals("base_25", paths.get(0).getName()); + stopThreads(); + } + + @Test + public void cleanupAfterMajorPartitionCompaction() throws Exception { + Table t = newTable("default", "campc", true); + Partition p = newPartition(t, "today"); + + HiveConf conf = new HiveConf(); + + addBaseFile(conf, t, p, 20L, 20); + addDeltaFile(conf, t, p, 21L, 22L, 2); + addDeltaFile(conf, t, p, 23L, 24L, 2); + addBaseFile(conf, t, p, 25L, 25); + + burnThroughTransactions(25); + + CompactionRequest rqst = new CompactionRequest("default", "campc", CompactionType.MAJOR); + rqst.setPartitionname("ds=today"); + txnHandler.compact(rqst); + CompactionInfo ci = txnHandler.findNextToCompact("fred"); + txnHandler.markCompacted(ci); + txnHandler.setRunAs(ci.id, System.getProperty("user.name")); + + startCleaner(conf); + Thread.sleep(sleepTime); // should be long enough to get through the loop + + // Check there are no compactions requests left. + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertNull(rsp.getCompacts()); + + // Check that the files are removed + List paths = getDirectories(conf, t, p); + Assert.assertEquals(1, paths.size()); + Assert.assertEquals("base_25", paths.get(0).getName()); + stopThreads(); + } + + @Test + public void cleanupAfterMinorTableCompaction() throws Exception { + Table t = newTable("default", "camitc", false); + + HiveConf conf = new HiveConf(); + + addBaseFile(conf, t, null, 20L, 20); + addDeltaFile(conf, t, null, 21L, 22L, 2); + addDeltaFile(conf, t, null, 23L, 24L, 2); + addDeltaFile(conf, t, null, 21L, 24L, 4); + + burnThroughTransactions(25); + + CompactionRequest rqst = new CompactionRequest("default", "camitc", CompactionType.MINOR); + txnHandler.compact(rqst); + CompactionInfo ci = txnHandler.findNextToCompact("fred"); + txnHandler.markCompacted(ci); + txnHandler.setRunAs(ci.id, System.getProperty("user.name")); + + startCleaner(conf); + Thread.sleep(sleepTime); // should be long enough to get through the loop + + // Check there are no compactions requests left. + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertNull(rsp.getCompacts()); + + // Check that the files are removed + List paths = getDirectories(conf, t, null); + Assert.assertEquals(2, paths.size()); + boolean sawBase = false, sawDelta = false; + for (Path p : paths) { + if (p.getName().equals("base_20")) sawBase = true; + else if (p.getName().equals("delta_21_24")) sawDelta = true; + else Assert.fail("Unexpected file " + p.getName()); + } + Assert.assertTrue(sawBase); + Assert.assertTrue(sawDelta); + stopThreads(); + } + + @Test + public void cleanupAfterMinorPartitionCompaction() throws Exception { + Table t = newTable("default", "camipc", true); + Partition p = newPartition(t, "today"); + + HiveConf conf = new HiveConf(); + + addBaseFile(conf, t, p, 20L, 20); + addDeltaFile(conf, t, p, 21L, 22L, 2); + addDeltaFile(conf, t, p, 23L, 24L, 2); + addDeltaFile(conf, t, p, 21L, 24L, 4); + + burnThroughTransactions(25); + + CompactionRequest rqst = new CompactionRequest("default", "camipc", CompactionType.MINOR); + rqst.setPartitionname("ds=today"); + txnHandler.compact(rqst); + CompactionInfo ci = txnHandler.findNextToCompact("fred"); + txnHandler.markCompacted(ci); + txnHandler.setRunAs(ci.id, System.getProperty("user.name")); + + startCleaner(conf); + Thread.sleep(sleepTime); // should be long enough to get through the loop + + // Check there are no compactions requests left. + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertNull(rsp.getCompacts()); + + // Check that the files are removed + List paths = getDirectories(conf, t, p); + Assert.assertEquals(2, paths.size()); + boolean sawBase = false, sawDelta = false; + for (Path path : paths) { + if (path.getName().equals("base_20")) sawBase = true; + else if (path.getName().equals("delta_21_24")) sawDelta = true; + else Assert.fail("Unexpected file " + path.getName()); + } + Assert.assertTrue(sawBase); + Assert.assertTrue(sawDelta); + stopThreads(); + } + + @Test + public void blockedByLockTable() throws Exception { + Table t = newTable("default", "bblt", false); + + HiveConf conf = new HiveConf(); + + addBaseFile(conf, t, null, 20L, 20); + addDeltaFile(conf, t, null, 21L, 22L, 2); + addDeltaFile(conf, t, null, 23L, 24L, 2); + addDeltaFile(conf, t, null, 21L, 24L, 4); + + burnThroughTransactions(25); + + CompactionRequest rqst = new CompactionRequest("default", "bblt", CompactionType.MINOR); + txnHandler.compact(rqst); + CompactionInfo ci = txnHandler.findNextToCompact("fred"); + txnHandler.markCompacted(ci); + txnHandler.setRunAs(ci.id, System.getProperty("user.name")); + + LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, + "default", "bblt", null); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + LockResponse res = txnHandler.lock(req); + + startCleaner(conf); + Thread.sleep(sleepTime); // should be long enough to get through the loop + + // Check there are no compactions requests left. + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List compacts = rsp.getCompacts(); + Assert.assertEquals(1, compacts.size()); + Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); + Assert.assertEquals("bblt", compacts.get(0).getTablename()); + Assert.assertEquals(CompactionType.MINOR, compacts.get(0).getType()); + stopThreads(); + } + + @Test + public void blockedByLockPartition() throws Exception { + Table t = newTable("default", "bblp", true); + Partition p = newPartition(t, "today"); + + HiveConf conf = new HiveConf(); + + addBaseFile(conf, t, p, 20L, 20); + addDeltaFile(conf, t, p, 21L, 22L, 2); + addDeltaFile(conf, t, p, 23L, 24L, 2); + addDeltaFile(conf, t, p, 21L, 24L, 4); + + burnThroughTransactions(25); + + CompactionRequest rqst = new CompactionRequest("default", "bblp", CompactionType.MINOR); + rqst.setPartitionname("ds=today"); + txnHandler.compact(rqst); + CompactionInfo ci = txnHandler.findNextToCompact("fred"); + txnHandler.markCompacted(ci); + txnHandler.setRunAs(ci.id, System.getProperty("user.name")); + + LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, + "default", "bblp", "ds=today"); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + LockResponse res = txnHandler.lock(req); + + startCleaner(conf); + Thread.sleep(sleepTime); // should be long enough to get through the loop + + // Check there are no compactions requests left. + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List compacts = rsp.getCompacts(); + Assert.assertEquals(1, compacts.size()); + Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); + Assert.assertEquals("bblp", compacts.get(0).getTablename()); + Assert.assertEquals("ds=today", compacts.get(0).getPartitionname()); + Assert.assertEquals(CompactionType.MINOR, compacts.get(0).getType()); + stopThreads(); + } + + @Before + public void setUpTxnDb() throws Exception { + TxnDbUtil.setConfValues(new HiveConf()); + TxnDbUtil.prepDb(); + } + + @After + public void tearDownTxnDb() throws Exception { + TxnDbUtil.cleanDb(); + } +} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java new file mode 100644 index 0000000..e85e762 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java @@ -0,0 +1,582 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import junit.framework.Assert; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler; +import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Tests for the compactor Initiator thread. + */ +public class TestInitiator extends CompactorTest { + static final private String CLASS_NAME = TestInitiator.class.getName(); + static final private Log LOG = LogFactory.getLog(CLASS_NAME); + + public TestInitiator() throws Exception { + super(); + } + + @Test + public void nothing() throws Exception { + // Test that the whole things works when there's nothing in the queue. This is just a + // survival test. + startInitiator(new HiveConf()); + Thread.sleep(sleepTime); // should be long enough to get through the loop + stopThreads(); + } + + @Test + public void recoverFailedLocalWorkers() throws Exception { + Table t = newTable("default", "rflw1", false); + CompactionRequest rqst = new CompactionRequest("default", "rflw1", CompactionType.MINOR); + txnHandler.compact(rqst); + + t = newTable("default", "rflw2", false); + rqst = new CompactionRequest("default", "rflw2", CompactionType.MINOR); + txnHandler.compact(rqst); + + txnHandler.findNextToCompact(Worker.hostname() + "-193892"); + txnHandler.findNextToCompact("nosuchhost-193892"); + + startInitiator(new HiveConf()); + Thread.sleep(sleepTime); // should be long enough to get through the loop + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List compacts = rsp.getCompacts(); + Assert.assertEquals(2, compacts.size()); + boolean sawInitiated = false; + for (ShowCompactResponseElement c : compacts) { + if (c.getState().equals("working")) { + Assert.assertEquals("nosuchhost-193892", c.getWorkerid()); + } else if (c.getState().equals("initiated")) { + sawInitiated = true; + } else { + Assert.fail("Unexpected state"); + } + } + Assert.assertTrue(sawInitiated); + stopThreads(); + } + + @Test + public void recoverFailedRemoteWorkers() throws Exception { + Table t = newTable("default", "rfrw1", false); + CompactionRequest rqst = new CompactionRequest("default", "rfrw1", CompactionType.MINOR); + txnHandler.compact(rqst); + + txnHandler.findNextToCompact("nosuchhost-193892"); + + HiveConf conf = new HiveConf(); + HiveConf.setLongVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT, 1L); + Thread.sleep(sleepTime); // should be long enough to get through the loop + + startInitiator(conf); + Thread.sleep(sleepTime); // should be long enough to get through the loop + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List compacts = rsp.getCompacts(); + Assert.assertEquals(1, compacts.size()); + Assert.assertEquals("initiated", compacts.get(0).getState()); + stopThreads(); + } + + @Test + public void majorCompactOnTableTooManyAborts() throws Exception { + Table t = newTable("default", "mcottma", false); + + HiveConf conf = new HiveConf(); + HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 10); + + for (int i = 0; i < 11; i++) { + long txnid = openTxn(); + LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, + "default", "mcottma", null); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + req.setTxnid(txnid); + LockResponse res = txnHandler.lock(req); + txnHandler.abortTxn(new AbortTxnRequest(txnid)); + } + + startInitiator(conf); + Thread.sleep(sleepTime); // should be long enough to get through the loop + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List compacts = rsp.getCompacts(); + Assert.assertEquals(1, compacts.size()); + Assert.assertEquals("initiated", compacts.get(0).getState()); + Assert.assertEquals("mcottma", compacts.get(0).getTablename()); + Assert.assertEquals(CompactionType.MAJOR, compacts.get(0).getType()); + stopThreads(); + } + + @Test + public void majorCompactOnPartitionTooManyAborts() throws Exception { + Table t = newTable("default", "mcoptma", true); + Partition p = newPartition(t, "today"); + + HiveConf conf = new HiveConf(); + HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 10); + + for (int i = 0; i < 11; i++) { + long txnid = openTxn(); + LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, + "default", "mcoptma", "ds=today"); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + req.setTxnid(txnid); + LockResponse res = txnHandler.lock(req); + txnHandler.abortTxn(new AbortTxnRequest(txnid)); + } + + startInitiator(conf); + Thread.sleep(sleepTime); // should be long enough to get through the loop + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List compacts = rsp.getCompacts(); + Assert.assertEquals(1, compacts.size()); + Assert.assertEquals("initiated", compacts.get(0).getState()); + Assert.assertEquals("mcoptma", compacts.get(0).getTablename()); + Assert.assertEquals("ds=today", compacts.get(0).getPartitionname()); + Assert.assertEquals(CompactionType.MAJOR, compacts.get(0).getType()); + stopThreads(); + } + + @Test + public void noCompactOnManyDifferentPartitionAborts() throws Exception { + Table t = newTable("default", "ncomdpa", true); + for (int i = 0; i < 11; i++) { + Partition p = newPartition(t, "day-" + i); + } + + HiveConf conf = new HiveConf(); + HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 10); + + for (int i = 0; i < 11; i++) { + long txnid = openTxn(); + LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, + "default", "ncomdpa", "ds=day-" + i); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + req.setTxnid(txnid); + LockResponse res = txnHandler.lock(req); + txnHandler.abortTxn(new AbortTxnRequest(txnid)); + } + + startInitiator(conf); + Thread.sleep(sleepTime); // should be long enough to get through the loop + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertNull(rsp.getCompacts()); + stopThreads(); + } + + @Test + public void cleanEmptyAbortedTxns() throws Exception { + // Test that we are cleaning aborted transactions with no components left in txn_components. + // Put one aborted transaction with an entry in txn_components to make sure we don't + // accidently clean it too. + Table t = newTable("default", "ceat", false); + + HiveConf conf = new HiveConf(); + + long txnid = openTxn(); + LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, + "default", "ceat", null); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + req.setTxnid(txnid); + LockResponse res = txnHandler.lock(req); + txnHandler.abortTxn(new AbortTxnRequest(txnid)); + + for (int i = 0; i < 100; i++) { + txnid = openTxn(); + txnHandler.abortTxn(new AbortTxnRequest(txnid)); + } + GetOpenTxnsResponse openTxns = txnHandler.getOpenTxns(); + Assert.assertEquals(101, openTxns.getOpen_txnsSize()); + + startInitiator(conf); + Thread.sleep(sleepTime); // should be long enough to get through the loop + + openTxns = txnHandler.getOpenTxns(); + Assert.assertEquals(1, openTxns.getOpen_txnsSize()); + stopThreads(); + } + + @Test + public void noCompactWhenNoCompactSet() throws Exception { + Map parameters = new HashMap(1); + parameters.put("NO_AUTO_COMPACTION", "true"); + Table t = newTable("default", "ncwncs", false, parameters); + + HiveConf conf = new HiveConf(); + HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 10); + + for (int i = 0; i < 11; i++) { + long txnid = openTxn(); + LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, + "default", "ncwncs", null); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + req.setTxnid(txnid); + LockResponse res = txnHandler.lock(req); + txnHandler.abortTxn(new AbortTxnRequest(txnid)); + } + + startInitiator(conf); + Thread.sleep(sleepTime); // should be long enough to get through the loop + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertNull(rsp.getCompacts()); + stopThreads(); + } + + @Test + public void noCompactWhenCompactAlreadyScheduled() throws Exception { + Table t = newTable("default", "ncwcas", false); + + HiveConf conf = new HiveConf(); + HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 10); + + for (int i = 0; i < 11; i++) { + long txnid = openTxn(); + LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, + "default", "ncwcas", null); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + req.setTxnid(txnid); + LockResponse res = txnHandler.lock(req); + txnHandler.abortTxn(new AbortTxnRequest(txnid)); + } + + CompactionRequest rqst = new CompactionRequest("default", "ncwcas", CompactionType.MAJOR); + txnHandler.compact(rqst); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List compacts = rsp.getCompacts(); + Assert.assertEquals(1, compacts.size()); + Assert.assertEquals("initiated", compacts.get(0).getState()); + Assert.assertEquals("ncwcas", compacts.get(0).getTablename()); + + startInitiator(conf); + Thread.sleep(sleepTime); // should be long enough to get through the loop + + rsp = txnHandler.showCompact(new ShowCompactRequest()); + compacts = rsp.getCompacts(); + Assert.assertEquals(1, compacts.size()); + Assert.assertEquals("initiated", compacts.get(0).getState()); + Assert.assertEquals("ncwcas", compacts.get(0).getTablename()); + Assert.assertEquals(CompactionType.MAJOR, compacts.get(0).getType()); + stopThreads(); + } + + @Test + public void compactTableHighDeltaPct() throws Exception { + Table t = newTable("default", "cthdp", false); + + HiveConf conf = new HiveConf(); + + addBaseFile(conf, t, null, 20L, 20); + addDeltaFile(conf, t, null, 21L, 22L, 2); + addDeltaFile(conf, t, null, 23L, 24L, 2); + + burnThroughTransactions(23); + + long txnid = openTxn(); + LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, + "default", "cthdp", null); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + req.setTxnid(txnid); + LockResponse res = txnHandler.lock(req); + txnHandler.commitTxn(new CommitTxnRequest(txnid)); + + startInitiator(conf); + Thread.sleep(sleepTime); // should be long enough to get through the loop + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List compacts = rsp.getCompacts(); + Assert.assertEquals(1, compacts.size()); + Assert.assertEquals("initiated", compacts.get(0).getState()); + Assert.assertEquals("cthdp", compacts.get(0).getTablename()); + Assert.assertEquals(CompactionType.MAJOR, compacts.get(0).getType()); + stopThreads(); + } + + @Test + public void compactPartitionHighDeltaPct() throws Exception { + Table t = newTable("default", "cphdp", true); + Partition p = newPartition(t, "today"); + + HiveConf conf = new HiveConf(); + + addBaseFile(conf, t, p, 20L, 20); + addDeltaFile(conf, t, p, 21L, 22L, 2); + addDeltaFile(conf, t, p, 23L, 24L, 2); + + burnThroughTransactions(23); + + long txnid = openTxn(); + LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, + "default", "cphdp", "ds=today"); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + req.setTxnid(txnid); + LockResponse res = txnHandler.lock(req); + txnHandler.commitTxn(new CommitTxnRequest(txnid)); + + startInitiator(conf); + Thread.sleep(sleepTime); // should be long enough to get through the loop + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List compacts = rsp.getCompacts(); + Assert.assertEquals(1, compacts.size()); + Assert.assertEquals("initiated", compacts.get(0).getState()); + Assert.assertEquals("cphdp", compacts.get(0).getTablename()); + Assert.assertEquals("ds=today", compacts.get(0).getPartitionname()); + Assert.assertEquals(CompactionType.MAJOR, compacts.get(0).getType()); + stopThreads(); + } + + @Test + public void noCompactTableDeltaPctNotHighEnough() throws Exception { + Table t = newTable("default", "nctdpnhe", false); + + HiveConf conf = new HiveConf(); + + addBaseFile(conf, t, null, 50L, 50); + addDeltaFile(conf, t, null, 21L, 22L, 2); + addDeltaFile(conf, t, null, 23L, 24L, 2); + + burnThroughTransactions(53); + + long txnid = openTxn(); + LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, + "default", "nctdpnhe", null); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + req.setTxnid(txnid); + LockResponse res = txnHandler.lock(req); + txnHandler.commitTxn(new CommitTxnRequest(txnid)); + + startInitiator(conf); + Thread.sleep(sleepTime); // should be long enough to get through the loop + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertNull(rsp.getCompacts()); + stopThreads(); + } + + @Test + public void compactTableTooManyDeltas() throws Exception { + Table t = newTable("default", "cttmd", false); + + HiveConf conf = new HiveConf(); + + addBaseFile(conf, t, null, 200L, 200); + addDeltaFile(conf, t, null, 201L, 201L, 1); + addDeltaFile(conf, t, null, 202L, 202L, 1); + addDeltaFile(conf, t, null, 203L, 203L, 1); + addDeltaFile(conf, t, null, 204L, 204L, 1); + addDeltaFile(conf, t, null, 205L, 205L, 1); + addDeltaFile(conf, t, null, 206L, 206L, 1); + addDeltaFile(conf, t, null, 207L, 207L, 1); + addDeltaFile(conf, t, null, 208L, 208L, 1); + addDeltaFile(conf, t, null, 209L, 209L, 1); + addDeltaFile(conf, t, null, 210L, 210L, 1); + addDeltaFile(conf, t, null, 211L, 211L, 1); + + burnThroughTransactions(210); + + long txnid = openTxn(); + LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, + "default", "cttmd", null); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + req.setTxnid(txnid); + LockResponse res = txnHandler.lock(req); + txnHandler.commitTxn(new CommitTxnRequest(txnid)); + + startInitiator(conf); + Thread.sleep(sleepTime); // should be long enough to get through the loop + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List compacts = rsp.getCompacts(); + Assert.assertEquals(1, compacts.size()); + Assert.assertEquals("initiated", compacts.get(0).getState()); + Assert.assertEquals("cttmd", compacts.get(0).getTablename()); + Assert.assertEquals(CompactionType.MINOR, compacts.get(0).getType()); + stopThreads(); + } + + @Test + public void compactPartitionTooManyDeltas() throws Exception { + Table t = newTable("default", "cptmd", true); + Partition p = newPartition(t, "today"); + + HiveConf conf = new HiveConf(); + + addBaseFile(conf, t, p, 200L, 200); + addDeltaFile(conf, t, p, 201L, 201L, 1); + addDeltaFile(conf, t, p, 202L, 202L, 1); + addDeltaFile(conf, t, p, 203L, 203L, 1); + addDeltaFile(conf, t, p, 204L, 204L, 1); + addDeltaFile(conf, t, p, 205L, 205L, 1); + addDeltaFile(conf, t, p, 206L, 206L, 1); + addDeltaFile(conf, t, p, 207L, 207L, 1); + addDeltaFile(conf, t, p, 208L, 208L, 1); + addDeltaFile(conf, t, p, 209L, 209L, 1); + addDeltaFile(conf, t, p, 210L, 210L, 1); + addDeltaFile(conf, t, p, 211L, 211L, 1); + + burnThroughTransactions(210); + + long txnid = openTxn(); + LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, + "default", "cptmd", "ds=today"); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + req.setTxnid(txnid); + LockResponse res = txnHandler.lock(req); + txnHandler.commitTxn(new CommitTxnRequest(txnid)); + + startInitiator(conf); + Thread.sleep(sleepTime); // should be long enough to get through the loop + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List compacts = rsp.getCompacts(); + Assert.assertEquals(1, compacts.size()); + Assert.assertEquals("initiated", compacts.get(0).getState()); + Assert.assertEquals("cptmd", compacts.get(0).getTablename()); + Assert.assertEquals("ds=today", compacts.get(0).getPartitionname()); + Assert.assertEquals(CompactionType.MINOR, compacts.get(0).getType()); + stopThreads(); + } + + @Test + public void noCompactTableNotEnoughDeltas() throws Exception { + Table t = newTable("default", "nctned", false); + + HiveConf conf = new HiveConf(); + + addBaseFile(conf, t, null, 200L, 200); + addDeltaFile(conf, t, null, 201L, 205L, 5); + addDeltaFile(conf, t, null, 206L, 211L, 6); + + burnThroughTransactions(210); + + long txnid = openTxn(); + LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, + "default", "nctned", null); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + req.setTxnid(txnid); + LockResponse res = txnHandler.lock(req); + txnHandler.commitTxn(new CommitTxnRequest(txnid)); + + startInitiator(conf); + Thread.sleep(sleepTime); // should be long enough to get through the loop + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertNull(rsp.getCompacts()); + stopThreads(); + } + + @Test + public void chooseMajorOverMinorWhenBothValid() throws Exception { + Table t = newTable("default", "cmomwbv", false); + + HiveConf conf = new HiveConf(); + + addBaseFile(conf, t, null, 200L, 200); + addDeltaFile(conf, t, null, 201L, 211L, 11); + addDeltaFile(conf, t, null, 212L, 222L, 11); + addDeltaFile(conf, t, null, 223L, 233L, 11); + addDeltaFile(conf, t, null, 234L, 244L, 11); + addDeltaFile(conf, t, null, 245L, 255L, 11); + addDeltaFile(conf, t, null, 256L, 266L, 11); + addDeltaFile(conf, t, null, 267L, 277L, 11); + addDeltaFile(conf, t, null, 278L, 288L, 11); + addDeltaFile(conf, t, null, 289L, 299L, 11); + addDeltaFile(conf, t, null, 300L, 310L, 11); + addDeltaFile(conf, t, null, 311L, 321L, 11); + + burnThroughTransactions(320); + + long txnid = openTxn(); + LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, + "default", "cmomwbv", null); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + req.setTxnid(txnid); + LockResponse res = txnHandler.lock(req); + txnHandler.commitTxn(new CommitTxnRequest(txnid)); + + startInitiator(conf); + Thread.sleep(sleepTime); // should be long enough to get through the loop + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List compacts = rsp.getCompacts(); + Assert.assertEquals(1, compacts.size()); + Assert.assertEquals("initiated", compacts.get(0).getState()); + Assert.assertEquals("cmomwbv", compacts.get(0).getTablename()); + Assert.assertEquals(CompactionType.MAJOR, compacts.get(0).getType()); + stopThreads(); + } + + // TODO test compactions with legacy file types + + @Before + public void setUpTxnDb() throws Exception { + TxnDbUtil.setConfValues(new HiveConf()); + TxnDbUtil.prepDb(); + } + + @After + public void tearDownTxnDb() throws Exception { + TxnDbUtil.cleanDb(); + } + +} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java new file mode 100644 index 0000000..3c21a5d --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java @@ -0,0 +1,599 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import junit.framework.Assert; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Tests for the worker thread and its MR jobs. + */ +public class TestWorker extends CompactorTest { + static final private String CLASS_NAME = TestWorker.class.getName(); + static final private Log LOG = LogFactory.getLog(CLASS_NAME); + + public TestWorker() throws Exception { + super(); + } + + @Test + public void nothing() throws Exception { + // Test that the whole things works when there's nothing in the queue. This is just a + // survival test. + startWorker(new HiveConf()); + Thread.sleep(sleepTime); // should be long enough to get through the loop + stopThreads(); + } + + @Test + public void stringableMap() throws Exception { + // Empty map case + CompactorMR.StringableMap m = new CompactorMR.StringableMap(new HashMap()); + String s = m.toString(); + Assert.assertEquals("0:", s); + m = new CompactorMR.StringableMap(s); + Assert.assertEquals(0, m.size()); + + Map base = new HashMap(); + base.put("mary", "poppins"); + base.put("bert", null); + base.put(null, "banks"); + m = new CompactorMR.StringableMap(base); + s = m.toString(); + m = new CompactorMR.StringableMap(s); + Assert.assertEquals(3, m.size()); + Map saw = new HashMap(3); + saw.put("mary", false); + saw.put("bert", false); + saw.put(null, false); + for (Map.Entry e : m.entrySet()) { + saw.put(e.getKey(), true); + if ("mary".equals(e.getKey())) Assert.assertEquals("poppins", e.getValue()); + else if ("bert".equals(e.getKey())) Assert.assertNull(e.getValue()); + else if (null == e.getKey()) Assert.assertEquals("banks", e.getValue()); + else Assert.fail("Unexpected value " + e.getKey()); + } + Assert.assertEquals(3, saw.size()); + Assert.assertTrue(saw.get("mary")); + Assert.assertTrue(saw.get("bert")); + Assert.assertTrue(saw.get(null)); + } + + @Test + public void stringableList() throws Exception { + // Empty list case + CompactorMR.StringableList ls = new CompactorMR.StringableList(); + String s = ls.toString(); + Assert.assertEquals("0:", s); + ls = new CompactorMR.StringableList(s); + Assert.assertEquals(0, ls.size()); + + ls = new CompactorMR.StringableList(); + ls.add(new Path("/tmp")); + ls.add(new Path("/usr")); + s = ls.toString(); + Assert.assertTrue("Expected 2:4:/tmp4:/usr or 2:4:/usr4:/tmp, got " + s, + "2:4:/tmp4:/usr".equals(s) || "2:4:/usr4:/tmp".equals(s)); + ls = new CompactorMR.StringableList(s); + Assert.assertEquals(2, ls.size()); + boolean sawTmp = false, sawUsr = false; + for (Path p : ls) { + if ("/tmp".equals(p.toString())) sawTmp = true; + else if ("/usr".equals(p.toString())) sawUsr = true; + else Assert.fail("Unexpected path " + p.toString()); + } + Assert.assertTrue(sawTmp); + Assert.assertTrue(sawUsr); + } + + @Test + public void inputSplit() throws Exception { + String basename = "/warehouse/foo/base_1"; + String delta1 = "/warehouse/foo/delta_2_3"; + String delta2 = "/warehouse/foo/delta_4_7"; + + HiveConf conf = new HiveConf(); + Path file = new Path(System.getProperty("java.io.tmpdir") + + System.getProperty("file.separator") + "newWriteInputSplitTest"); + FileSystem fs = FileSystem.get(conf); + FSDataOutputStream os = fs.create(file); + for (int i = 0; i < 10; i++) { + os.writeBytes("mary had a little lamb its fleece was white as snow\n"); + } + os.close(); + List files = new ArrayList(1); + files.add(file); + + Path[] deltas = new Path[2]; + deltas[0] = new Path(delta1); + deltas[1] = new Path(delta2); + + CompactorMR.CompactorInputSplit split = + new CompactorMR.CompactorInputSplit(conf, 3, files, new Path(basename), deltas); + + Assert.assertEquals(520L, split.getLength()); + String[] locations = split.getLocations(); + Assert.assertEquals(1, locations.length); + Assert.assertEquals("localhost", locations[0]); + + ByteArrayOutputStream buf = new ByteArrayOutputStream(); + DataOutput out = new DataOutputStream(buf); + split.write(out); + + split = new CompactorMR.CompactorInputSplit(); + DataInput in = new DataInputStream(new ByteArrayInputStream(buf.toByteArray())); + split.readFields(in); + + Assert.assertEquals(3, split.getBucket()); + Assert.assertEquals(basename, split.getBaseDir().toString()); + deltas = split.getDeltaDirs(); + Assert.assertEquals(2, deltas.length); + Assert.assertEquals(delta1, deltas[0].toString()); + Assert.assertEquals(delta2, deltas[1].toString()); + } + + @Test + public void inputSplitNullBase() throws Exception { + String delta1 = "/warehouse/foo/delta_2_3"; + String delta2 = "/warehouse/foo/delta_4_7"; + + HiveConf conf = new HiveConf(); + Path file = new Path(System.getProperty("java.io.tmpdir") + + System.getProperty("file.separator") + "newWriteInputSplitTest"); + FileSystem fs = FileSystem.get(conf); + FSDataOutputStream os = fs.create(file); + for (int i = 0; i < 10; i++) { + os.writeBytes("mary had a little lamb its fleece was white as snow\n"); + } + os.close(); + List files = new ArrayList(1); + files.add(file); + + Path[] deltas = new Path[2]; + deltas[0] = new Path(delta1); + deltas[1] = new Path(delta2); + + CompactorMR.CompactorInputSplit split = + new CompactorMR.CompactorInputSplit(conf, 3, files, null, deltas); + + ByteArrayOutputStream buf = new ByteArrayOutputStream(); + DataOutput out = new DataOutputStream(buf); + split.write(out); + + split = new CompactorMR.CompactorInputSplit(); + DataInput in = new DataInputStream(new ByteArrayInputStream(buf.toByteArray())); + split.readFields(in); + + Assert.assertEquals(3, split.getBucket()); + Assert.assertNull(split.getBaseDir()); + deltas = split.getDeltaDirs(); + Assert.assertEquals(2, deltas.length); + Assert.assertEquals(delta1, deltas[0].toString()); + Assert.assertEquals(delta2, deltas[1].toString()); + } + + @Test + public void sortedTable() throws Exception { + List sortCols = new ArrayList(1); + sortCols.add(new Order("b", 1)); + + Table t = newTable("default", "st", false, new HashMap(), sortCols); + + HiveConf conf = new HiveConf(); + + addBaseFile(conf, t, null, 20L, 20); + addDeltaFile(conf, t, null, 21L, 22L, 2); + addDeltaFile(conf, t, null, 23L, 24L, 2); + addDeltaFile(conf, t, null, 21L, 24L, 4); + + burnThroughTransactions(25); + + CompactionRequest rqst = new CompactionRequest("default", "st", CompactionType.MINOR); + txnHandler.compact(rqst); + + startWorker(new HiveConf()); + Thread.sleep(sleepTime); // should be long enough to get through the loop + stopThreads(); + + // There should still be four directories in the location. + FileSystem fs = FileSystem.get(conf); + FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation())); + Assert.assertEquals(4, stat.length); + } + + @Test + public void sortedPartition() throws Exception { + List sortCols = new ArrayList(1); + sortCols.add(new Order("b", 1)); + + Table t = newTable("default", "sp", true, new HashMap(), sortCols); + Partition p = newPartition(t, "today", sortCols); + HiveConf conf = new HiveConf(); + + addBaseFile(conf, t, p, 20L, 20); + addDeltaFile(conf, t, p, 21L, 22L, 2); + addDeltaFile(conf, t, p, 23L, 24L, 2); + addDeltaFile(conf, t, p, 21L, 24L, 4); + + burnThroughTransactions(25); + + CompactionRequest rqst = new CompactionRequest("default", "sp", CompactionType.MINOR); + rqst.setPartitionname("ds=today"); + txnHandler.compact(rqst); + + startWorker(new HiveConf()); + Thread.sleep(sleepTime); // should be long enough to get through the loop + stopThreads(); + + // There should still be four directories in the location. + FileSystem fs = FileSystem.get(conf); + FileStatus[] stat = fs.listStatus(new Path(p.getSd().getLocation())); + Assert.assertEquals(4, stat.length); + } + + @Test + public void minorTableWithBase() throws Exception { + LOG.debug("Starting minorTableWithBase"); + Table t = newTable("default", "mtwb", false); + + HiveConf conf = new HiveConf(); + + addBaseFile(conf, t, null, 20L, 20); + addDeltaFile(conf, t, null, 21L, 22L, 2); + addDeltaFile(conf, t, null, 23L, 24L, 2); + + burnThroughTransactions(25); + + CompactionRequest rqst = new CompactionRequest("default", "mtwb", CompactionType.MINOR); + txnHandler.compact(rqst); + + startWorker(conf); + Thread.sleep(sleepTime); // should be long enough to get through the loop + joinThreads(); + + // There should still now be 5 directories in the location + FileSystem fs = FileSystem.get(conf); + FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation())); + Assert.assertEquals(4, stat.length); + + // Find the new delta file and make sure it has the right contents + boolean sawNewDelta = false; + for (int i = 0; i < stat.length; i++) { + if (stat[i].getPath().getName().equals("delta_0000021_0000024")) { + sawNewDelta = true; + FileStatus[] buckets = fs.listStatus(stat[i].getPath()); + Assert.assertEquals(1, buckets.length); + Assert.assertEquals("bucket_00000", buckets[0].getPath().getName()); + Assert.assertEquals(208L, buckets[0].getLen()); + } else { + LOG.debug("This is not the delta file you are looking for " + stat[i].getPath().getName()); + } + } + Assert.assertTrue(sawNewDelta); + } + + @Test + public void minorPartitionWithBase() throws Exception { + Table t = newTable("default", "mpwb", true); + Partition p = newPartition(t, "today"); + HiveConf conf = new HiveConf(); + + addBaseFile(conf, t, p, 20L, 20); + addDeltaFile(conf, t, p, 21L, 22L, 2); + addDeltaFile(conf, t, p, 23L, 24L, 2); + + burnThroughTransactions(25); + + CompactionRequest rqst = new CompactionRequest("default", "mpwb", CompactionType.MINOR); + rqst.setPartitionname("ds=today"); + txnHandler.compact(rqst); + + startWorker(new HiveConf()); + Thread.sleep(sleepTime); // should be long enough to get through the loop + joinThreads(); + + // There should still be four directories in the location. + FileSystem fs = FileSystem.get(conf); + FileStatus[] stat = fs.listStatus(new Path(p.getSd().getLocation())); + Assert.assertEquals(4, stat.length); + + // Find the new delta file and make sure it has the right contents + boolean sawNewDelta = false; + for (int i = 0; i < stat.length; i++) { + if (stat[i].getPath().getName().equals("delta_0000021_0000024")) { + sawNewDelta = true; + FileStatus[] buckets = fs.listStatus(stat[i].getPath()); + Assert.assertEquals(1, buckets.length); + Assert.assertEquals("bucket_00000", buckets[0].getPath().getName()); + Assert.assertEquals(208L, buckets[0].getLen()); + } else { + LOG.debug("This is not the delta file you are looking for " + stat[i].getPath().getName()); + } + } + Assert.assertTrue(sawNewDelta); + } + + @Test + public void minorTableNoBase() throws Exception { + LOG.debug("Starting minorTableWithBase"); + Table t = newTable("default", "mtnb", false); + + HiveConf conf = new HiveConf(); + + addDeltaFile(conf, t, null, 1L, 2L, 2); + addDeltaFile(conf, t, null, 3L, 4L, 2); + + burnThroughTransactions(5); + + CompactionRequest rqst = new CompactionRequest("default", "mtnb", CompactionType.MINOR); + txnHandler.compact(rqst); + + startWorker(new HiveConf()); + Thread.sleep(sleepTime); // should be long enough to get through the loop + joinThreads(); + + // There should still now be 5 directories in the location + FileSystem fs = FileSystem.get(conf); + FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation())); + Assert.assertEquals(3, stat.length); + + // Find the new delta file and make sure it has the right contents + boolean sawNewDelta = false; + for (int i = 0; i < stat.length; i++) { + if (stat[i].getPath().getName().equals("delta_0000001_0000004")) { + sawNewDelta = true; + FileStatus[] buckets = fs.listStatus(stat[i].getPath()); + Assert.assertEquals(1, buckets.length); + Assert.assertEquals("bucket_00000", buckets[0].getPath().getName()); + Assert.assertEquals(208L, buckets[0].getLen()); + } else { + LOG.debug("This is not the delta file you are looking for " + stat[i].getPath().getName()); + } + } + Assert.assertTrue(sawNewDelta); + } + + @Test + public void majorTableWithBase() throws Exception { + LOG.debug("Starting majorTableWithBase"); + Table t = newTable("default", "matwb", false); + + HiveConf conf = new HiveConf(); + + addBaseFile(conf, t, null, 20L, 20); + addDeltaFile(conf, t, null, 21L, 22L, 2); + addDeltaFile(conf, t, null, 23L, 24L, 2); + + burnThroughTransactions(25); + + CompactionRequest rqst = new CompactionRequest("default", "matwb", CompactionType.MAJOR); + txnHandler.compact(rqst); + + startWorker(new HiveConf()); + Thread.sleep(sleepTime); // should be long enough to get through the loop + joinThreads(); + + // There should still now be 5 directories in the location + FileSystem fs = FileSystem.get(conf); + FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation())); + Assert.assertEquals(4, stat.length); + + // Find the new delta file and make sure it has the right contents + boolean sawNewBase = false; + for (int i = 0; i < stat.length; i++) { + if (stat[i].getPath().getName().equals("base_0000024")) { + sawNewBase = true; + FileStatus[] buckets = fs.listStatus(stat[i].getPath()); + Assert.assertEquals(1, buckets.length); + Assert.assertEquals("bucket_00000", buckets[0].getPath().getName()); + Assert.assertEquals(1248L, buckets[0].getLen()); + } else { + LOG.debug("This is not the file you are looking for " + stat[i].getPath().getName()); + } + } + Assert.assertTrue(sawNewBase); + } + + @Test + public void majorPartitionWithBase() throws Exception { + Table t = newTable("default", "mapwb", true); + Partition p = newPartition(t, "today"); + HiveConf conf = new HiveConf(); + + addBaseFile(conf, t, p, 20L, 20); + addDeltaFile(conf, t, p, 21L, 22L, 2); + addDeltaFile(conf, t, p, 23L, 24L, 2); + + burnThroughTransactions(25); + + CompactionRequest rqst = new CompactionRequest("default", "mapwb", CompactionType.MAJOR); + rqst.setPartitionname("ds=today"); + txnHandler.compact(rqst); + + startWorker(new HiveConf()); + Thread.sleep(sleepTime); // should be long enough to get through the loop + joinThreads(); + + // There should still be four directories in the location. + FileSystem fs = FileSystem.get(conf); + FileStatus[] stat = fs.listStatus(new Path(p.getSd().getLocation())); + Assert.assertEquals(4, stat.length); + + // Find the new delta file and make sure it has the right contents + boolean sawNewBase = false; + for (int i = 0; i < stat.length; i++) { + if (stat[i].getPath().getName().equals("base_0000024")) { + sawNewBase = true; + FileStatus[] buckets = fs.listStatus(stat[i].getPath()); + Assert.assertEquals(1, buckets.length); + Assert.assertEquals("bucket_00000", buckets[0].getPath().getName()); + Assert.assertEquals(1248L, buckets[0].getLen()); + } else { + LOG.debug("This is not the file you are looking for " + stat[i].getPath().getName()); + } + } + Assert.assertTrue(sawNewBase); + } + + @Test + public void majorTableNoBase() throws Exception { + LOG.debug("Starting majorTableNoBase"); + Table t = newTable("default", "matnb", false); + + HiveConf conf = new HiveConf(); + + addDeltaFile(conf, t, null, 1L, 2L, 2); + addDeltaFile(conf, t, null, 3L, 4L, 2); + + burnThroughTransactions(5); + + CompactionRequest rqst = new CompactionRequest("default", "matnb", CompactionType.MAJOR); + txnHandler.compact(rqst); + + startWorker(new HiveConf()); + Thread.sleep(sleepTime); // should be long enough to get through the loop + joinThreads(); + + // There should still now be 5 directories in the location + FileSystem fs = FileSystem.get(conf); + FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation())); + Assert.assertEquals(3, stat.length); + + // Find the new delta file and make sure it has the right contents + boolean sawNewBase = false; + for (int i = 0; i < stat.length; i++) { + if (stat[i].getPath().getName().equals("base_0000004")) { + sawNewBase = true; + FileStatus[] buckets = fs.listStatus(stat[i].getPath()); + Assert.assertEquals(1, buckets.length); + Assert.assertEquals("bucket_00000", buckets[0].getPath().getName()); + Assert.assertEquals(208L, buckets[0].getLen()); + } else { + LOG.debug("This is not the file you are looking for " + stat[i].getPath().getName()); + } + } + Assert.assertTrue(sawNewBase); + } + + @Test + public void majorTableLegacy() throws Exception { + LOG.debug("Starting majorTableLegacy"); + Table t = newTable("default", "matl", false); + + HiveConf conf = new HiveConf(); + + addLegacyFile(conf, t, null, 20); + addDeltaFile(conf, t, null, 21L, 22L, 2); + addDeltaFile(conf, t, null, 23L, 24L, 2); + + burnThroughTransactions(25); + + CompactionRequest rqst = new CompactionRequest("default", "matl", CompactionType.MAJOR); + txnHandler.compact(rqst); + + startWorker(new HiveConf()); + Thread.sleep(sleepTime); // should be long enough to get through the loop + joinThreads(); + + // There should still now be 5 directories in the location + FileSystem fs = FileSystem.get(conf); + FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation())); + //Assert.assertEquals(4, stat.length); + + // Find the new delta file and make sure it has the right contents + boolean sawNewBase = false; + for (int i = 0; i < stat.length; i++) { + if (stat[i].getPath().getName().equals("base_0000024")) { + sawNewBase = true; + FileStatus[] buckets = fs.listStatus(stat[i].getPath()); + Assert.assertEquals(1, buckets.length); + Assert.assertEquals("bucket_00000", buckets[0].getPath().getName()); + Assert.assertEquals(1248L, buckets[0].getLen()); + } else { + LOG.debug("This is not the file you are looking for " + stat[i].getPath().getName()); + } + } + Assert.assertTrue(sawNewBase); + } + + @Test + public void minorTableLegacy() throws Exception { + LOG.debug("Starting minorTableLegacy"); + Table t = newTable("default", "mtl", false); + + HiveConf conf = new HiveConf(); + + addLegacyFile(conf, t, null, 20); + addDeltaFile(conf, t, null, 21L, 22L, 2); + addDeltaFile(conf, t, null, 23L, 24L, 2); + + burnThroughTransactions(25); + + CompactionRequest rqst = new CompactionRequest("default", "mtl", CompactionType.MINOR); + txnHandler.compact(rqst); + + startWorker(new HiveConf()); + Thread.sleep(sleepTime); // should be long enough to get through the loop + joinThreads(); + + // There should still now be 5 directories in the location + FileSystem fs = FileSystem.get(conf); + FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation())); + //Assert.assertEquals(4, stat.length); + + // Find the new delta file and make sure it has the right contents + boolean sawNewDelta = false; + for (int i = 0; i < stat.length; i++) { + if (stat[i].getPath().getName().equals("delta_0000021_0000024")) { + sawNewDelta = true; + FileStatus[] buckets = fs.listStatus(stat[i].getPath()); + Assert.assertEquals(1, buckets.length); + Assert.assertEquals("bucket_00000", buckets[0].getPath().getName()); + Assert.assertEquals(208L, buckets[0].getLen()); + } else { + LOG.debug("This is not the file you are looking for " + stat[i].getPath().getName()); + } + } + Assert.assertTrue(sawNewDelta); + } + + @Before + public void setUpTxnDb() throws Exception { + TxnDbUtil.setConfValues(new HiveConf()); + TxnDbUtil.prepDb(); + } + + @After + public void tearDownTxnDb() throws Exception { + TxnDbUtil.cleanDb(); + } +}