diff --git a/ql/pom.xml b/ql/pom.xml
index 7087a4c..ccddb96 100644
--- a/ql/pom.xml
+++ b/ql/pom.xml
@@ -342,6 +342,13 @@
org.apache.hadoop
+ hadoop-mapreduce-client-common
+ ${hadoop-23.version}
+ true
+ test
+
+
+ org.apache.hadoop
hadoop-hdfs
${hadoop-23.version}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
new file mode 100644
index 0000000..d9dd4e4
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
+import org.apache.hadoop.hive.metastore.txn.TxnHandler;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+
+/**
+ * A class to clean directories after compactions. This will run in a separate thread.
+ */
+public class Cleaner extends CompactorThread {
+ static final private String CLASS_NAME = Cleaner.class.getName();
+ static final private Log LOG = LogFactory.getLog(CLASS_NAME);
+ static final private int threadId = 10001;
+
+ private long cleanerCheckInterval = 5000;
+
+ @Override
+ public void run() {
+ // Make sure nothing escapes this run method and kills the metastore at large,
+ // so wrap it in a big catch Throwable statement.
+ while (!stop.timeToStop) {
+ try {
+ long startedAt = System.currentTimeMillis();
+
+ // Now look for new entries ready to be cleaned.
+ List toClean = txnHandler.findReadyToClean();
+ for (CompactionInfo ci : toClean) {
+ LockComponent comp = null;
+ if (ci.partName == null) {
+ comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.TABLE, ci.dbname,
+ ci.tableName, null);
+ } else {
+ comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.PARTITION, ci.dbname,
+ ci.tableName, ci.partName);
+ }
+ List components = new ArrayList(1);
+ components.add(comp);
+ LockRequest rqst = new LockRequest(components, System.getProperty("user.name"),
+ Worker.hostname());
+ LockResponse rsp = txnHandler.lockNoWait(rqst);
+ try {
+ if (rsp.getState() == LockState.ACQUIRED) {
+ clean(ci);
+ }
+ } finally {
+ txnHandler.unlock(new UnlockRequest(rsp.getLockid()));
+ }
+ }
+
+ // Now, go back to bed until it's time to do this again
+ long elapsedTime = System.currentTimeMillis() - startedAt;
+ if (elapsedTime >= cleanerCheckInterval) continue;
+ else Thread.sleep(cleanerCheckInterval - elapsedTime);
+ } catch (Throwable t) {
+ LOG.error("Caught an exception in the main loop of compactor cleaner, " +
+ StringUtils.stringifyException(t));
+ }
+ }
+ }
+
+ private void clean(CompactionInfo ci) {
+ String s = "Starting cleaning for " + ci.getFullPartitionName();
+ LOG.info(s);
+ try {
+ StorageDescriptor sd = resolveStorageDescriptor(resolveTable(ci), resolvePartition(ci));
+ String location = sd.getLocation();
+
+ // Create a bogus validTxnList with a high water mark set to MAX_LONG and no open
+ // transactions. This assures that all deltas are treated as valid and all we return are
+ // obsolete files.
+ GetOpenTxnsResponse rsp = new GetOpenTxnsResponse(Long.MAX_VALUE, new HashSet());
+ IMetaStoreClient.ValidTxnList txnList = new HiveMetaStoreClient.ValidTxnListImpl(rsp);
+
+ AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(location), conf, txnList);
+ List obsoleteDirs = dir.getObsolete();
+ final List filesToDelete = new ArrayList(obsoleteDirs.size());
+ for (FileStatus stat : obsoleteDirs) {
+ filesToDelete.add(stat.getPath());
+ }
+ if (filesToDelete.size() < 1) {
+ LOG.warn("Hmm, nothing to delete in the cleaner for directory " + location +
+ ", that hardly seems right.");
+ return;
+ }
+ final FileSystem fs = filesToDelete.get(0).getFileSystem(conf);
+
+ if (runJobAsSelf(ci.runAs)) {
+ removeFiles(fs, filesToDelete);
+ } else {
+ UserGroupInformation ugi = UserGroupInformation.createProxyUser(ci.runAs,
+ UserGroupInformation.getLoginUser());
+ ugi.doAs(new PrivilegedExceptionAction() {
+ @Override
+ public Object run() throws Exception {
+ removeFiles(fs, filesToDelete);
+ return null;
+ }
+ });
+ }
+
+ } catch (Exception e) {
+ LOG.error("Caught exception when cleaning, unable to complete cleaning " +
+ StringUtils.stringifyException(e));
+ } finally {
+ // We need to clean this out one way or another.
+ txnHandler.markCleaned(ci);
+ }
+
+ }
+
+ private void removeFiles(FileSystem fs, List deadFilesWalking) throws IOException {
+ for (Path dead : deadFilesWalking) {
+ LOG.debug("Doing to delete path " + dead.toString());
+ fs.delete(dead, true);
+ }
+
+ }
+
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
new file mode 100644
index 0000000..979b07a
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -0,0 +1,519 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.*;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.*;
+
+import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.apache.hadoop.util.StringUtils;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLDecoder;
+import java.util.*;
+
+/**
+ * Class to do compactions via an MR job. This has to be in the ql package rather than metastore
+ * .compactions package with all of it's relatives because it needs access to the actual input
+ * and output formats, which are in ql. ql depends on metastore and we can't have a circular
+ * dependency.
+ */
+public class CompactorMR {
+
+ static final private String CLASS_NAME = CompactorMR.class.getName();
+ static final private Log LOG = LogFactory.getLog(CLASS_NAME);
+
+ static final private String INPUT_FORMAT_CLASS_NAME = "hive.compactor.input.format.class.name";
+ static final private String OUTPUT_FORMAT_CLASS_NAME = "hive.compactor.output.format.class.name";
+ static final private String LOCATION = "hive.compactor.input.dir";
+ static final private String SERDE = "hive.compactor.serde";
+ static final private String MIN_TXN = "hive.compactor.txn.min";
+ static final private String MAX_TXN = "hive.compactor.txn.max";
+ static final private String IS_MAJOR = "hive.compactor.is.major";
+ static final private String IS_COMPRESSED = "hive.compactor.is.compressed";
+ static final private String TABLE_PROPS = "hive.compactor.table.props";
+ static final private String NUM_BUCKETS = "hive.compactor.num.buckets";
+ static final private String BASE_DIR = "hive.compactor.base.dir";
+ static final private String DELTA_DIRS = "hive.compactor.delta.dirs";
+ static final private String DIRS_TO_SEARCH = "hive.compactor.dirs.to.search";
+
+ public CompactorMR() {
+ }
+
+ /**
+ * Run a compactor job.
+ * @param conf Hive configuration file
+ * @param jobName name to run this job with
+ * @param t metastore table
+ * @param sd metastore storage descriptor
+ * @param txns list of valid transactions
+ * @param isMajor is this a major compaction?
+ */
+ void run(HiveConf conf, String jobName, Table t, StorageDescriptor sd,
+ IMetaStoreClient.ValidTxnList txns, boolean isMajor) {
+ try {
+ JobConf job = new JobConf(conf);
+ job.setJobName(jobName);
+ job.setOutputKeyClass(NullWritable.class);
+ job.setOutputValueClass(NullWritable.class);
+ job.setJarByClass(CompactorMR.class);
+ LOG.debug("User jar set to " + job.getJar());
+ job.setMapperClass(CompactorMap.class);
+ job.setNumReduceTasks(0);
+ job.setInputFormat(CompactorInputFormat.class);
+ job.setOutputFormat(NullOutputFormat.class);
+
+ job.set(LOCATION, sd.getLocation());
+ job.set(INPUT_FORMAT_CLASS_NAME, sd.getInputFormat());
+ job.set(OUTPUT_FORMAT_CLASS_NAME, sd.getOutputFormat());
+ job.set(SERDE, sd.getSerdeInfo().getName());
+ job.setBoolean(IS_MAJOR, isMajor);
+ job.setBoolean(IS_COMPRESSED, sd.isCompressed());
+ job.set(TABLE_PROPS, new StringableMap(t.getParameters()).toString());
+ job.setInt(NUM_BUCKETS, sd.getBucketColsSize());
+
+ // Figure out and encode what files we need to read. We do this here (rather than in
+ // getSplits below) because as part of this we discover our minimum and maximum transactions,
+ // and discovering that in getSplits is too late as we then have no way to pass it to our
+ // mapper.
+
+ AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(sd.getLocation()), conf, txns);
+ StringableList dirsToSearch = new StringableList();
+ Path baseDir = null;
+ if (isMajor) {
+ // There may not be a base dir if the partition was empty before inserts or if this
+ // partition is just now being converted to ACID.
+ baseDir = dir.getBaseDirectory();
+ if (baseDir == null) {
+ List originalFiles = dir.getOriginalFiles();
+ if (!(originalFiles == null) && !(originalFiles.size() == 0)) {
+ // There are original format files
+ for (FileStatus stat : originalFiles) {
+ dirsToSearch.add(stat.getPath());
+ LOG.debug("Adding original file " + stat.getPath().toString() + " to dirs to search");
+ }
+ // Set base to the location so that the input format reads the original files.
+ baseDir = new Path(sd.getLocation());
+
+ /*
+ Path origDir = new Path(sd.getLocation());
+ dirsToSearch.add(origDir);
+ LOG.debug("Adding original directory " + origDir + " to dirs to search");
+ */
+ }
+ } else {
+ // add our base to the list of directories to search for files in.
+ LOG.debug("Adding base directory " + baseDir + " to dirs to search");
+ dirsToSearch.add(baseDir);
+ }
+ }
+
+ List parsedDeltas = dir.getCurrentDirectories();
+
+ if (parsedDeltas == null || parsedDeltas.size() == 0) {
+ // Seriously, no deltas? Can't compact that.
+ LOG.error("No delta files found to compact in " + sd.getLocation());
+ return;
+ }
+
+ StringableList deltaDirs = new StringableList();
+ long minTxn = Long.MAX_VALUE;
+ long maxTxn = Long.MIN_VALUE;
+ for (AcidUtils.ParsedDelta delta : parsedDeltas) {
+ LOG.debug("Adding delta " + delta.getPath() + " to directories to search");
+ dirsToSearch.add(delta.getPath());
+ deltaDirs.add(delta.getPath());
+ minTxn = Math.min(minTxn, delta.getMinTransaction());
+ maxTxn = Math.max(maxTxn, delta.getMaxTransaction());
+ }
+
+ if (baseDir != null) job.set(BASE_DIR, baseDir.toString());
+ job.set(DELTA_DIRS, deltaDirs.toString());
+ job.set(DIRS_TO_SEARCH, dirsToSearch.toString());
+ job.setLong(MIN_TXN, minTxn);
+ job.setLong(MAX_TXN, maxTxn);
+ LOG.debug("Setting minimum transaction to " + minTxn);
+ LOG.debug("Setting maximume transaction to " + maxTxn);
+
+ JobClient.runJob(job);
+ } catch (Throwable e) {
+ // Don't let anything past us.
+ LOG.error("Running MR job " + jobName + " to compact failed, " +
+ StringUtils.stringifyException(e));
+ }
+ }
+
+ static class CompactorInputSplit implements InputSplit {
+ private long length = 0;
+ private List locations;
+ private int bucketNum;
+ private Path base;
+ private Path[] deltas;
+
+ public CompactorInputSplit() {
+ }
+
+ /**
+ *
+ * @param hadoopConf
+ * @param bucket bucket to be processed by this split
+ * @param files actual files this split should process. It is assumed the caller has already
+ * parsed out the files in base and deltas to populate this list.
+ * @param base directory of the base, or the partition/table location if the files are in old
+ * style. Can be null.
+ * @param deltas directories of the delta files.
+ * @throws IOException
+ */
+ CompactorInputSplit(Configuration hadoopConf, int bucket, List files, Path base,
+ Path[] deltas)
+ throws IOException {
+ bucketNum = bucket;
+ this.base = base;
+ this.deltas = deltas;
+ locations = new ArrayList();
+
+ for (Path path : files) {
+ FileSystem fs = path.getFileSystem(hadoopConf);
+ FileStatus stat = fs.getFileStatus(path);
+ length += stat.getLen();
+ BlockLocation[] locs = fs.getFileBlockLocations(stat, 0, length);
+ for (int i = 0; i < locs.length; i++) {
+ String[] hosts = locs[i].getHosts();
+ for (int j = 0; j < hosts.length; j++) {
+ locations.add(hosts[j]);
+ }
+ }
+ }
+ }
+
+ @Override
+ public long getLength() throws IOException {
+ return length;
+ }
+
+ @Override
+ public String[] getLocations() throws IOException {
+ return locations.toArray(new String[locations.size()]);
+ }
+
+ @Override
+ public void write(DataOutput dataOutput) throws IOException {
+ dataOutput.writeLong(length);
+ dataOutput.writeInt(locations.size());
+ for (int i = 0; i < locations.size(); i++) {
+ dataOutput.writeInt(locations.get(i).length());
+ dataOutput.writeBytes(locations.get(i));
+ }
+ dataOutput.writeInt(bucketNum);
+ if (base == null) {
+ dataOutput.writeInt(0);
+ } else {
+ dataOutput.writeInt(base.toString().length());
+ dataOutput.writeBytes(base.toString());
+ }
+ dataOutput.writeInt(deltas.length);
+ for (int i = 0; i < deltas.length; i++) {
+ dataOutput.writeInt(deltas[i].toString().length());
+ dataOutput.writeBytes(deltas[i].toString());
+ }
+
+ }
+
+ @Override
+ public void readFields(DataInput dataInput) throws IOException {
+ int len;
+ byte[] buf;
+
+ locations = new ArrayList();
+ length = dataInput.readLong();
+ LOG.debug("Read length of " + length);
+ int numElements = dataInput.readInt();
+ LOG.debug("Read numElements of " + numElements);
+ for (int i = 0; i < numElements; i++) {
+ len = dataInput.readInt();
+ LOG.debug("Read file length of " + len);
+ buf = new byte[len];
+ dataInput.readFully(buf);
+ locations.add(new String(buf));
+ }
+ bucketNum = dataInput.readInt();
+ LOG.debug("Read bucket number of " + bucketNum);
+ len = dataInput.readInt();
+ LOG.debug("Read base path length of " + len);
+ if (len > 0) {
+ buf = new byte[len];
+ dataInput.readFully(buf);
+ base = new Path(new String(buf));
+ }
+ numElements = dataInput.readInt();
+ deltas = new Path[numElements];
+ for (int i = 0; i < numElements; i++) {
+ len = dataInput.readInt();
+ buf = new byte[len];
+ dataInput.readFully(buf);
+ deltas[i] = new Path(new String(buf));
+ }
+ }
+
+ int getBucket() {
+ return bucketNum;
+ }
+
+ Path getBaseDir() {
+ return base;
+ }
+
+ Path[] getDeltaDirs() {
+ return deltas;
+ }
+ }
+
+ static class CompactorInputFormat
+ implements InputFormat {
+
+ @Override
+ public InputSplit[] getSplits(JobConf entries, int i) throws IOException {
+ Path baseDir = null;
+ if (entries.get(BASE_DIR) != null) baseDir = new Path(entries.get(BASE_DIR));
+ StringableList deltaDirs = new StringableList(entries.get(DELTA_DIRS));
+ StringableList dirsToSearch = new StringableList(entries.get(DIRS_TO_SEARCH));
+
+ InputSplit[] splits = new InputSplit[entries.getInt(NUM_BUCKETS, 1)];
+ for (int bucket = 0; bucket < splits.length; bucket++) {
+
+ // Go find the actual files to read. This will change for each split.
+ List filesToRead = new ArrayList();
+ for (Path d : dirsToSearch) {
+ // If this is a base or delta directory, then we need to be looking for a bucket file.
+ // But if it's a legacy file then we need to add it directly.
+ if (d.getName().startsWith(AcidUtils.BASE_PREFIX) ||
+ d.getName().startsWith(AcidUtils.DELTA_PREFIX)) {
+ Path bucketFile = AcidUtils.createBucketFile(d, bucket);
+ filesToRead.add(bucketFile);
+ LOG.debug("Adding " + bucketFile.toString() + " to files to read");
+ } else {
+ filesToRead.add(d);
+ LOG.debug("Adding " + d + " to files to read");
+ }
+ }
+ splits[bucket] = new CompactorInputSplit(entries, bucket, filesToRead, baseDir,
+ deltaDirs.toArray(new Path[deltaDirs.size()]));
+ }
+ LOG.debug("Returning " + splits.length + " splits");
+ return splits;
+ }
+
+ @Override
+ public RecordReader getRecordReader(InputSplit inputSplit, JobConf entries,
+ Reporter reporter) throws IOException {
+ CompactorInputSplit split = (CompactorInputSplit)inputSplit;
+ AcidInputFormat aif =
+ instantiate(AcidInputFormat.class, entries.get(INPUT_FORMAT_CLASS_NAME));
+ return aif.getRawReader(entries, entries.getBoolean(IS_MAJOR, false), split.getBucket(),
+ split.getBaseDir(), split.getDeltaDirs());
+ }
+ }
+
+ static class CompactorMap
+ implements Mapper {
+
+ JobConf jobConf;
+ FSRecordWriter writer;
+
+ @Override
+ public void map(RecordIdentifier identifier, V value,
+ OutputCollector nullWritableVOutputCollector,
+ Reporter reporter) throws IOException {
+ // After all this setup there's actually almost nothing to do.
+ getWriter(reporter, identifier.getBucketId()); // Make sure we've opened the writer
+ writer.write(value);
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (writer != null) writer.close(false);
+ }
+
+ @Override
+ public void configure(JobConf entries) {
+ jobConf = entries;
+ }
+
+ private void getWriter(Reporter reporter, int bucket) throws IOException {
+ if (writer == null) {
+ AbstractSerDe serde = instantiate(AbstractSerDe.class, jobConf.get(SERDE));
+ ObjectInspector inspector;
+ try {
+ inspector = serde.getObjectInspector();
+ } catch (SerDeException e) {
+ LOG.error("Unable to get object inspector, " + StringUtils.stringifyException(e));
+ throw new IOException(e);
+ }
+
+ AcidOutputFormat.Options options = new AcidOutputFormat.Options(jobConf);
+ options.inspector(inspector)
+ .writingBase(jobConf.getBoolean(IS_MAJOR, false))
+ .isCompressed(jobConf.getBoolean(IS_COMPRESSED, false))
+ .tableProperties(new StringableMap(jobConf.get(TABLE_PROPS)).toProperties())
+ .reporter(reporter)
+ .minimumTransactionId(jobConf.getLong(MIN_TXN, Long.MAX_VALUE))
+ .maximumTransactionId(jobConf.getLong(MAX_TXN, Long.MIN_VALUE))
+ .bucket(bucket);
+
+ // Instantiate the underlying output format
+ AcidOutputFormat aof =
+ instantiate(AcidOutputFormat.class, jobConf.get(OUTPUT_FORMAT_CLASS_NAME));
+
+ Path location = AcidUtils.createFilename(new Path(jobConf.get(LOCATION)), options);
+ writer = aof.getRawRecordWriter(location, options);
+ }
+ }
+ }
+
+ static class StringableMap extends HashMap {
+
+ StringableMap(String s) {
+ String[] parts = s.split(":", 2);
+ // read that many chars
+ int numElements = Integer.valueOf(parts[0]);
+ s = parts[1];
+ for (int i = 0; i < numElements; i++) {
+ parts = s.split(":", 2);
+ int len = Integer.valueOf(parts[0]);
+ String key = null;
+ if (len > 0) key = parts[1].substring(0, len);
+ parts = parts[1].substring(len).split(":", 2);
+ len = Integer.valueOf(parts[0]);
+ String value = null;
+ if (len > 0) value = parts[1].substring(0, len);
+ s = parts[1].substring(len);
+ put(key, value);
+ }
+ }
+
+ StringableMap(Map m) {
+ super(m);
+ }
+
+ @Override
+ public String toString() {
+ StringBuffer buf = new StringBuffer();
+ buf.append(size());
+ buf.append(':');
+ if (size() > 0) {
+ for (Map.Entry entry : entrySet()) {
+ int length = (entry.getKey() == null) ? 0 : entry.getKey().length();
+ buf.append(entry.getKey() == null ? 0 : length);
+ buf.append(':');
+ if (length > 0) buf.append(entry.getKey());
+ length = (entry.getValue() == null) ? 0 : entry.getValue().length();
+ buf.append(length);
+ buf.append(':');
+ if (length > 0) buf.append(entry.getValue());
+ }
+ }
+ return buf.toString();
+ }
+
+ public Properties toProperties() {
+ Properties props = new Properties();
+ props.putAll(this);
+ return props;
+ }
+ }
+
+ static class StringableList extends ArrayList {
+ StringableList() {
+
+ }
+
+ StringableList(String s) {
+ String[] parts = s.split(":", 2);
+ // read that many chars
+ int numElements = Integer.valueOf(parts[0]);
+ s = parts[1];
+ for (int i = 0; i < numElements; i++) {
+ parts = s.split(":", 2);
+ int len = Integer.valueOf(parts[0]);
+ String val = parts[1].substring(0, len);
+ s = parts[1].substring(len);
+ add(new Path(val));
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuffer buf = new StringBuffer();
+ buf.append(size());
+ buf.append(':');
+ if (size() > 0) {
+ for (Path p : this) {
+ buf.append(p.toString().length());
+ buf.append(':');
+ buf.append(p.toString());
+ }
+ }
+ return buf.toString();
+ }
+ }
+
+ private static T instantiate(Class classType, String classname) throws IOException {
+ T t = null;
+ try {
+ Class c = Class.forName(classname);
+ Object o = c.newInstance();
+ if (classType.isAssignableFrom(o.getClass())) {
+ t = (T)o;
+ } else {
+ String s = classname + " is not an instance of " + classType.getName();
+ LOG.error(s);
+ throw new IOException(s);
+ }
+ } catch (ClassNotFoundException e) {
+ LOG.error("Unable to instantiate class, " + StringUtils.stringifyException(e));
+ throw new IOException(e);
+ } catch (InstantiationException e) {
+ LOG.error("Unable to instantiate class, " + StringUtils.stringifyException(e));
+ throw new IOException(e);
+ } catch (IllegalAccessException e) {
+ LOG.error("Unable to instantiate class, " + StringUtils.stringifyException(e));
+ throw new IOException(e);
+ }
+ return t;
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
new file mode 100644
index 0000000..cdb2660
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.MetaStoreThread;
+import org.apache.hadoop.hive.metastore.RawStore;
+import org.apache.hadoop.hive.metastore.RetryingRawStore;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
+import org.apache.hadoop.hive.metastore.txn.TxnHandler;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Superclass for all threads in the compactor.
+ */
+abstract class CompactorThread extends Thread implements MetaStoreThread {
+ static final private String CLASS_NAME = CompactorThread.class.getName();
+ static final private Log LOG = LogFactory.getLog(CLASS_NAME);
+
+ protected HiveConf conf;
+ protected CompactionTxnHandler txnHandler;
+ protected RawStore rs;
+ protected int threadId;
+ protected BooleanPointer stop;
+
+ @Override
+ public void setHiveConf(HiveConf conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public void setThreadId(int threadId) {
+ this.threadId = threadId;
+
+ }
+
+ @Override
+ public void init(BooleanPointer stop) throws MetaException {
+ this.stop = stop;
+ setPriority(MIN_PRIORITY);
+ setDaemon(true); // this means the process will exit without waiting for this thread
+
+ // Get our own instance of the transaction handler
+ txnHandler = new CompactionTxnHandler(conf);
+
+ // Get our own connection to the database so we can get table and partition information.
+ rs = RetryingRawStore.getProxy(conf, conf,
+ conf.getVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL), threadId);
+ }
+
+ /**
+ * Find the table being compacted
+ * @param ci compaction info returned from the compaction queue
+ * @return metastore table
+ * @throws org.apache.hadoop.hive.metastore.api.MetaException if the table cannot be found.
+ */
+ protected Table resolveTable(CompactionInfo ci) throws MetaException {
+ try {
+ return rs.getTable(ci.dbname, ci.tableName);
+ } catch (MetaException e) {
+ LOG.error("Unable to find table " + ci.getFullTableName() + ", " + e.getMessage());
+ throw e;
+ }
+ }
+
+ /**
+ * Get the partition being compacted.
+ * @param ci compaction info returned from the compaction queue
+ * @return metastore partition, or null if there is not partition in this compaction info
+ * @throws Exception if underlying calls throw, or if the partition name resolves to more than
+ * one partition.
+ */
+ protected Partition resolvePartition(CompactionInfo ci) throws Exception {
+ Partition p = null;
+ if (ci.partName != null) {
+ List names = new ArrayList(1);
+ names.add(ci.partName);
+ List parts = null;
+ try {
+ parts = rs.getPartitionsByNames(ci.dbname, ci.tableName, names);
+ } catch (Exception e) {
+ LOG.error("Unable to find partition " + ci.getFullPartitionName() + ", " + e.getMessage());
+ throw e;
+ }
+ if (parts.size() != 1) {
+ LOG.error(ci.getFullPartitionName() + " does not refer to a single partition");
+ throw new MetaException("Too many partitions");
+ }
+ return parts.get(0);
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Get the storage descriptor for a compaction.
+ * @param t table from {@link #resolveTable(org.apache.hadoop.hive.metastore.txn.CompactionInfo)}
+ * @param p table from {@link #resolvePartition(org.apache.hadoop.hive.metastore.txn.CompactionInfo)}
+ * @return metastore storage descriptor.
+ */
+ protected StorageDescriptor resolveStorageDescriptor(Table t, Partition p) {
+ return (p == null) ? t.getSd() : p.getSd();
+ }
+
+ /**
+ * Determine which user to run an operation as, based on the owner of the directory to be
+ * compacted. It is asserted that either the user running the hive metastore or the table
+ * owner must be able to stat the directory and determine the owner.
+ * @param location directory that will be read or written to.
+ * @param t metastore table object
+ * @return username of the owner of the location.
+ * @throws java.io.IOException if neither the hive metastore user nor the table owner can stat
+ * the location.
+ */
+ protected String findUserToRunAs(String location, Table t) throws IOException,
+ InterruptedException {
+ LOG.debug("Determining who to run the job as.");
+ final Path p = new Path(location);
+ final FileSystem fs = p.getFileSystem(conf);
+ try {
+ FileStatus stat = fs.getFileStatus(p);
+ LOG.debug("Running job as " + stat.getOwner());
+ return stat.getOwner();
+ } catch (AccessControlException e) {
+ // TODO not sure this is the right exception
+ LOG.debug("Unable to stat file as current user, trying as table owner");
+
+ // Now, try it as the table owner and see if we get better luck.
+ final List wrapper = new ArrayList(1);
+ UserGroupInformation ugi = UserGroupInformation.createProxyUser(t.getOwner(),
+ UserGroupInformation.getLoginUser());
+ ugi.doAs(new PrivilegedExceptionAction() {
+ @Override
+ public Object run() throws Exception {
+ FileStatus stat = fs.getFileStatus(p);
+ wrapper.add(stat.getOwner());
+ return null;
+ }
+ });
+
+ if (wrapper.size() == 1) {
+ LOG.debug("Running job as " + wrapper.get(0));
+ return wrapper.get(0);
+ }
+ }
+ LOG.error("Unable to stat file as either current user or table owner, giving up");
+ throw new IOException("Unable to stat file");
+ }
+
+ /**
+ * Determine whether to run this job as the current user or whether we need a doAs to switch
+ * users.
+ * @param owner of the directory we will be working in, as determined by
+ * {@link #findUserToRunAs(String, org.apache.hadoop.hive.metastore.api.Table)}
+ * @return true if the job should run as the current user, false if a doAs is needed.
+ */
+ protected boolean runJobAsSelf(String owner) {
+ return (owner.equals(System.getProperty("user.name")));
+ }
+
+ protected String tableName(Table t) {
+ return t.getDbName() + "." + t.getTableName();
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
new file mode 100644
index 0000000..33169d4
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
+import org.apache.hadoop.hive.metastore.txn.TxnHandler;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A class to initiate compactions. This will run in a separate thread.
+ */
+public class Initiator extends CompactorThread {
+ static final private String CLASS_NAME = Initiator.class.getName();
+ static final private Log LOG = LogFactory.getLog(CLASS_NAME);
+ static final private int threadId = 10000;
+
+ static final private String NO_COMPACTION = "NO_AUTO_COMPACTION";
+
+ private long checkInterval;
+
+ @Override
+ public void run() {
+ // Make sure nothing escapes this run method and kills the metastore at large,
+ // so wrap it in a big catch Throwable statement.
+ try {
+ recoverFailedCompactions(false);
+
+ int abortedThreashold = HiveConf.getIntVar(conf,
+ HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD);
+
+ while (!stop.timeToStop) {
+ long startedAt = System.currentTimeMillis();
+
+ // Wrap the inner parts of the loop in a catch throwable so that any errors in the loop
+ // don't doom the entire thread.
+ try {
+ ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest());
+ GetOpenTxnsResponse openTxns = txnHandler.getOpenTxns();
+ IMetaStoreClient.ValidTxnList txns = new HiveMetaStoreClient.ValidTxnListImpl(openTxns);
+ List potentials = txnHandler.findPotentialCompactions(abortedThreashold);
+ LOG.debug("Found " + potentials.size() + " potential compactions, " +
+ "checking to see if we should compact any of them");
+ for (CompactionInfo ci : potentials) {
+ LOG.debug("Checking to see if we should compact " + ci.getFullPartitionName());
+ try {
+ Table t = resolveTable(ci);
+ // check if no compaction set for this table
+ if (t.getParameters().get(NO_COMPACTION) != null) {
+ LOG.info("Table " + tableName(t) + " marked " + NO_COMPACTION +
+ " so we will not compact it.");
+ continue;
+ }
+
+ // Check if we already have initiated or are working on a compaction for this partition
+ // or table. If so, skip it. If we are just waiting on cleaning we can still check,
+ // as it may be time to compact again even though we haven't cleaned.
+ if (lookForCurrentCompactions(currentCompactions, ci)) {
+ LOG.debug("Found currently initiated or working compaction for " +
+ ci.getFullPartitionName() + " so we will not initiate another compaction");
+ continue;
+ }
+
+ // Figure out who we should run the file operations as
+ Partition p = resolvePartition(ci);
+ StorageDescriptor sd = resolveStorageDescriptor(t, p);
+ String runAs = findUserToRunAs(sd.getLocation(), t);
+
+ if (checkForMajor(ci, txns, sd, runAs)) {
+ requestCompaction(ci, runAs, CompactionType.MAJOR);
+ } else if (checkForMinor(ci, txns, sd, runAs)) {
+ requestCompaction(ci, runAs, CompactionType.MINOR);
+ }
+ } catch (Throwable t) {
+ LOG.error("Caught exception while trying to determine if we should compact " +
+ ci.getFullPartitionName() + ". Marking clean to avoid repeated failures, " +
+ "" + StringUtils.stringifyException(t));
+ txnHandler.markCleaned(ci);
+ }
+ }
+
+ // Check for timed out remote workers.
+ recoverFailedCompactions(true);
+
+ // Clean anything from the txns table that has no components left in txn_components.
+ txnHandler.cleanEmptyAbortedTxns();
+ } catch (Throwable t) {
+ LOG.error("Initiator loop caught unexpected exception this time through the loop: " +
+ StringUtils.stringifyException(t));
+ }
+
+ long elapsedTime = System.currentTimeMillis() - startedAt;
+ if (elapsedTime >= checkInterval) continue;
+ else Thread.sleep(checkInterval - elapsedTime);
+
+ }
+ } catch (Throwable t) {
+ LOG.error("Caught an exception in the main loop of compactor initiator, exiting " +
+ StringUtils.stringifyException(t));
+ }
+ }
+
+ @Override
+ public void init(BooleanPointer stop) throws MetaException {
+ super.init(stop);
+ checkInterval = HiveConf.getLongVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_CHECK_INTERVAL);
+ }
+
+ private void recoverFailedCompactions(boolean remoteOnly) {
+ if (!remoteOnly) txnHandler.revokeFromLocalWorkers(Worker.hostname());
+ txnHandler.revokeTimedoutWorkers(HiveConf.getLongVar(conf,
+ HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT));
+ }
+
+ // Figure out if there are any currently running compactions on the same table or partition.
+ private boolean lookForCurrentCompactions(ShowCompactResponse compactions,
+ CompactionInfo ci) {
+ if (compactions.getCompacts() != null) {
+ for (ShowCompactResponseElement e : compactions.getCompacts()) {
+ if (e.getState() != TxnHandler.CLEANING_RESPONSE &&
+ e.getDbname().equals(ci.dbname) &&
+ e.getTablename().equals(ci.tableName) &&
+ (e.getPartitionname() == null && ci.partName == null ||
+ e.getPartitionname().equals(ci.partName))) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ private boolean checkForMajor(final CompactionInfo ci,
+ final IMetaStoreClient.ValidTxnList txns,
+ final StorageDescriptor sd,
+ String runAs) throws IOException, InterruptedException {
+ // If it's marked as too many aborted, we already know we need to compact
+ if (ci.tooManyAborts) {
+ LOG.debug("Found too many aborted transactions for " + ci.getFullPartitionName() + ", " +
+ "initiating major compaction");
+ return true;
+ }
+ if (runJobAsSelf(runAs)) {
+ return deltaSizeBigEnough(ci, txns, sd);
+ } else {
+ final List hackery = new ArrayList(1);
+ UserGroupInformation ugi = UserGroupInformation.createProxyUser(ci.runAs,
+ UserGroupInformation.getLoginUser());
+ ugi.doAs(new PrivilegedExceptionAction() {
+ @Override
+ public Object run() throws Exception {
+ hackery.add(deltaSizeBigEnough(ci, txns, sd));
+ return null;
+ }
+ });
+ return hackery.get(0);
+ }
+ }
+
+ private boolean deltaSizeBigEnough(CompactionInfo ci,
+ IMetaStoreClient.ValidTxnList txns,
+ StorageDescriptor sd) throws IOException {
+ Path location = new Path(sd.getLocation());
+ FileSystem fs = location.getFileSystem(conf);
+ AcidUtils.Directory dir = AcidUtils.getAcidState(location, conf, txns);
+ Path base = dir.getBaseDirectory();
+ FileStatus stat = fs.getFileStatus(base);
+ if (!stat.isDir()) {
+ LOG.error("Was assuming base " + base.toString() + " is directory, but it's a file!");
+ return false;
+ }
+ long baseSize = sumDirSize(fs, base);
+
+ List originals = dir.getOriginalFiles();
+ for (FileStatus origStat : originals) {
+ baseSize += origStat.getLen();
+ }
+
+ long deltaSize = 0;
+ List deltas = dir.getCurrentDirectories();
+ for (AcidUtils.ParsedDelta delta : deltas) {
+ stat = fs.getFileStatus(delta.getPath());
+ if (!stat.isDir()) {
+ LOG.error("Was assuming delta " + delta.getPath().toString() + " is a directory, " +
+ "but it's a file!");
+ return false;
+ }
+ deltaSize += sumDirSize(fs, delta.getPath());
+ }
+
+ float threshold = HiveConf.getFloatVar(conf,
+ HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD);
+ boolean bigEnough = (float)deltaSize/(float)baseSize > threshold;
+ if (LOG.isDebugEnabled()) {
+ StringBuffer msg = new StringBuffer("delta size: ");
+ msg.append(deltaSize);
+ msg.append(" base size: ");
+ msg.append(baseSize);
+ msg.append(" threshold: ");
+ msg.append(threshold);
+ msg.append(" will major compact: " + bigEnough);
+ LOG.debug(msg);
+ }
+ return bigEnough;
+ }
+
+ private long sumDirSize(FileSystem fs, Path dir) throws IOException {
+ long size = 0;
+ FileStatus[] buckets = fs.listStatus(dir);
+ for (int i = 0; i < buckets.length; i++) {
+ size += buckets[i].getLen();
+ }
+ return size;
+ }
+
+ private boolean checkForMinor(final CompactionInfo ci,
+ final IMetaStoreClient.ValidTxnList txns,
+ final StorageDescriptor sd,
+ String runAs) throws IOException, InterruptedException {
+
+ if (runJobAsSelf(runAs)) {
+ return tooManyDeltas(ci, txns, sd);
+ } else {
+ final List hackery = new ArrayList(1);
+ UserGroupInformation ugi = UserGroupInformation.createProxyUser(ci.runAs,
+ UserGroupInformation.getLoginUser());
+ ugi.doAs(new PrivilegedExceptionAction() {
+ @Override
+ public Object run() throws Exception {
+ hackery.add(tooManyDeltas(ci, txns, sd));
+ return null;
+ }
+ });
+ return hackery.get(0);
+ }
+ }
+
+ private boolean tooManyDeltas(CompactionInfo ci, IMetaStoreClient.ValidTxnList txns,
+ StorageDescriptor sd) throws IOException {
+ Path location = new Path(sd.getLocation());
+ FileSystem fs = location.getFileSystem(conf);
+
+ AcidUtils.Directory dir = AcidUtils.getAcidState(location, conf, txns);
+ List deltas = dir.getCurrentDirectories();
+ int threshold = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD);
+ boolean enough = deltas.size() > threshold;
+ LOG.debug("Found " + deltas.size() + " delta files, threshold is " + threshold +
+ (enough ? "" : "not") + " requesting minor compaction");
+ return enough;
+ }
+
+ private void requestCompaction(CompactionInfo ci, String runAs, CompactionType type) {
+ String s = "Requesting " + type.toString() + " compaction for " + ci.getFullPartitionName();
+ LOG.info(s);
+ CompactionRequest rqst = new CompactionRequest(ci.dbname, ci.tableName, type);
+ if (ci.partName != null) rqst.setPartitionname(ci.partName);
+ rqst.setRunas(runAs);
+ txnHandler.compact(rqst);
+ }
+}
\ No newline at end of file
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
new file mode 100644
index 0000000..1e5797c
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.security.PrivilegedAction;
+
+/**
+ * A class to do compactions. This will run in a separate thread. It will spin on the
+ * compaction queue and look for new work to do.
+ */
+public class Worker extends CompactorThread {
+ static final private String CLASS_NAME = Worker.class.getName();
+ static final private Log LOG = LogFactory.getLog(CLASS_NAME);
+ static final private long SLEEP_TIME = 5000;
+ static final private int baseThreadNum = 10002;
+
+ private String name;
+
+ /**
+ * Get the hostname that this worker is run on. Made static and public so that other classes
+ * can use the same method to know what host their worker threads are running on.
+ * @return hostname
+ */
+ public static String hostname() {
+ try {
+ return InetAddress.getLocalHost().getHostName();
+ } catch (UnknownHostException e) {
+ LOG.error("Unable to resolve my host name " + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void run() {
+ // Make sure nothing escapes this run method and kills the metastore at large,
+ // so wrap it in a big catch Throwable statement.
+ try {
+ while (!stop.timeToStop) {
+ CompactionInfo ci = txnHandler.findNextToCompact(name);
+
+ if (ci == null) {
+ try {
+ Thread.sleep(SLEEP_TIME);
+ continue;
+ } catch (InterruptedException e) {
+ LOG.warn("Worker thread sleep interrupted " + e.getMessage());
+ continue;
+ }
+ }
+
+ // Find the table we will be working with.
+ Table t1 = null;
+ try {
+ t1 = resolveTable(ci);
+ } catch (MetaException e) {
+ txnHandler.markCleaned(ci);
+ continue;
+ }
+ // This chicanery is to get around the fact that the table needs to be final in order to
+ // go into the doAs below.
+ final Table t = t1;
+
+ // Find the partition we will be working with, if there is one.
+ Partition p = null;
+ try {
+ p = resolvePartition(ci);
+ } catch (Exception e) {
+ txnHandler.markCleaned(ci);
+ continue;
+ }
+
+ // Find the appropriate storage descriptor
+ final StorageDescriptor sd = resolveStorageDescriptor(t, p);
+
+ // Check that the table or partition isn't sorted, as we don't yet support that.
+ if (sd.getSortCols() != null && !sd.getSortCols().isEmpty()) {
+ LOG.error("Attempt to compact sorted table, which is not yet supported!");
+ txnHandler.markCleaned(ci);
+ continue;
+ }
+
+ final boolean isMajor = (ci.type == CompactionType.MAJOR);
+ final IMetaStoreClient.ValidTxnList txns =
+ new HiveMetaStoreClient.ValidTxnListImpl(txnHandler.getOpenTxns());
+ final StringBuffer jobName = new StringBuffer(name);
+ jobName.append("-compactor-");
+ jobName.append(ci.getFullPartitionName());
+
+ // Determine who to run as
+ String runAs;
+ if (ci.runAs == null) {
+ runAs = findUserToRunAs(sd.getLocation(), t);
+ txnHandler.setRunAs(ci.id, runAs);
+ } else {
+ runAs = ci.runAs;
+ }
+
+ LOG.info("Starting " + ci.type.toString() + " compaction for " +
+ ci.getFullPartitionName());
+
+ final CompactorMR mr = new CompactorMR();
+ if (runJobAsSelf(runAs)) {
+ mr.run(conf, jobName.toString(), t, sd, txns, isMajor);
+ } else {
+ UserGroupInformation ugi = UserGroupInformation.createProxyUser(t.getOwner(),
+ UserGroupInformation.getLoginUser());
+ ugi.doAs(new PrivilegedAction() {
+ @Override
+ public Object run() {
+ mr.run(conf, jobName.toString(), t, sd, txns, isMajor);
+ return null;
+ }
+ });
+ }
+ }
+ } catch (Throwable t) {
+ LOG.error("Caught an exception in the main loop of compactor worker " + name +
+ ", exiting " + StringUtils.stringifyException(t));
+ }
+
+ }
+
+ @Override
+ public void init(BooleanPointer stop) throws MetaException {
+ super.init(stop);
+
+ StringBuffer name = new StringBuffer(hostname());
+ name.append("-");
+ name.append(getId());
+ this.name = name.toString();
+ setName(name.toString());
+ }
+
+}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
new file mode 100644
index 0000000..2a91579
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
@@ -0,0 +1,423 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.MetaStoreThread;
+import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.*;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.util.Progressable;
+import org.apache.thrift.TException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * Super class for all of the compactor test modules.
+ */
+public abstract class CompactorTest {
+ static final private String CLASS_NAME = CompactorTest.class.getName();
+ static final private Log LOG = LogFactory.getLog(CLASS_NAME);
+
+ protected CompactionTxnHandler txnHandler;
+ protected IMetaStoreClient ms;
+ protected long sleepTime = 1000;
+
+ private List threads = new ArrayList();
+ private MetaStoreThread.BooleanPointer stop = new MetaStoreThread.BooleanPointer();
+ private File tmpdir;
+
+ protected CompactorTest() throws Exception {
+ HiveConf conf = new HiveConf();
+ TxnDbUtil.setConfValues(conf);
+ TxnDbUtil.cleanDb();
+ ms = new HiveMetaStoreClient(conf);
+ txnHandler = new CompactionTxnHandler(conf);
+ tmpdir = new File(System.getProperty("java.io.tmpdir") +
+ System.getProperty("file.separator") + "compactor_test_tables");
+ tmpdir.mkdir();
+ tmpdir.deleteOnExit();
+ }
+
+ protected void startInitiator(HiveConf conf) throws Exception {
+ startThread('i', conf);
+ }
+
+ protected void startWorker(HiveConf conf) throws Exception {
+ startThread('w', conf);
+ }
+
+ protected void startCleaner(HiveConf conf) throws Exception {
+ startThread('c', conf);
+ }
+
+ protected void stopThreads() throws Exception {
+ stop.timeToStop = true;
+ for (Thread t : threads) t.interrupt();
+ }
+
+ protected void joinThreads() throws Exception {
+ stop.timeToStop = true;
+ for (Thread t : threads) t.join();
+ }
+
+ protected Table newTable(String dbName, String tableName, boolean partitioned) throws TException {
+ return newTable(dbName, tableName, partitioned, new HashMap(), null);
+ }
+
+ protected Table newTable(String dbName, String tableName, boolean partitioned,
+ Map parameters) throws TException {
+ return newTable(dbName, tableName, partitioned, parameters, null);
+
+ }
+
+ protected Table newTable(String dbName, String tableName, boolean partitioned,
+ Map parameters, List sortCols)
+ throws TException {
+ Table table = new Table();
+ table.setTableName(tableName);
+ table.setDbName(dbName);
+ table.setOwner("me");
+ table.setSd(newStorageDescriptor(getLocation(tableName, null), sortCols));
+ List partKeys = new ArrayList(1);
+ if (partitioned) {
+ partKeys.add(new FieldSchema("ds", "string", "no comment"));
+ table.setPartitionKeys(partKeys);
+ }
+
+ table.setParameters(parameters);
+
+ ms.createTable(table);
+ return table;
+ }
+
+ protected Partition newPartition(Table t, String value) throws Exception {
+ return newPartition(t, value, null);
+ }
+
+ protected Partition newPartition(Table t, String value, List sortCols) throws Exception {
+ Partition part = new Partition();
+ part.addToValues(value);
+ part.setDbName(t.getDbName());
+ part.setTableName(t.getTableName());
+ part.setSd(newStorageDescriptor(getLocation(t.getTableName(), value), sortCols));
+ part.setParameters(new HashMap());
+ ms.add_partition(part);
+ return part;
+ }
+
+ protected long openTxn() {
+ List txns = txnHandler.openTxns(new OpenTxnRequest(1, System.getProperty("user.name"),
+ Worker.hostname())).getTxn_ids();
+ return txns.get(0);
+ }
+
+ protected void addDeltaFile(HiveConf conf, Table t, Partition p, long minTxn, long maxTxn,
+ int numRecords) throws Exception{
+ addFile(conf, t, p, minTxn, maxTxn, numRecords, FileType.DELTA);
+ }
+
+ protected void addBaseFile(HiveConf conf, Table t, Partition p, long maxTxn,
+ int numRecords) throws Exception{
+ addFile(conf, t, p, 0, maxTxn, numRecords, FileType.BASE);
+ }
+
+ protected void addLegacyFile(HiveConf conf, Table t, Partition p,
+ int numRecords) throws Exception {
+ addFile(conf, t, p, 0, 0, numRecords, FileType.LEGACY);
+ }
+
+ protected List getDirectories(HiveConf conf, Table t, Partition p) throws Exception {
+ String partValue = (p == null) ? null : p.getValues().get(0);
+ String location = getLocation(t.getTableName(), partValue);
+ Path dir = new Path(location);
+ FileSystem fs = FileSystem.get(conf);
+ FileStatus[] stats = fs.listStatus(dir);
+ List paths = new ArrayList(stats.length);
+ for (int i = 0; i < stats.length; i++) paths.add(stats[i].getPath());
+ return paths;
+ }
+
+ protected void burnThroughTransactions(int num) throws NoSuchTxnException, TxnAbortedException {
+ OpenTxnsResponse rsp = txnHandler.openTxns(new OpenTxnRequest(num, "me", "localhost"));
+ for (long tid : rsp.getTxn_ids()) txnHandler.commitTxn(new CommitTxnRequest(tid));
+ }
+
+ private StorageDescriptor newStorageDescriptor(String location, List sortCols) {
+ StorageDescriptor sd = new StorageDescriptor();
+ List cols = new ArrayList(2);
+ cols.add(new FieldSchema("a", "varchar(25)", "still no comment"));
+ cols.add(new FieldSchema("b", "int", "comment"));
+ sd.setCols(cols);
+ sd.setLocation(location);
+ sd.setInputFormat(MockInputFormat.class.getName());
+ sd.setOutputFormat(MockOutputFormat.class.getName());
+ sd.setNumBuckets(1);
+ SerDeInfo serde = new SerDeInfo();
+ serde.setName(LazySimpleSerDe.class.getName());
+ sd.setSerdeInfo(serde);
+ List bucketCols = new ArrayList(1);
+ bucketCols.add("a");
+ sd.setBucketCols(bucketCols);
+
+ if (sortCols != null) {
+ sd.setSortCols(sortCols);
+ }
+ return sd;
+ }
+
+ // I can't do this with @Before because I want to be able to control the config file provided
+ // to each test.
+ private void startThread(char type, HiveConf conf) throws Exception {
+ TxnDbUtil.setConfValues(conf);
+ CompactorThread t = null;
+ switch (type) {
+ case 'i': t = new Initiator(); break;
+ case 'w': t = new Worker(); break;
+ case 'c': t = new Cleaner(); break;
+ default: throw new RuntimeException("Huh? Unknown thread type.");
+ }
+ threads.add(t);
+ t.setThreadId((int) t.getId());
+ t.setHiveConf(conf);
+ t.init(stop);
+ t.start();
+ }
+
+ private String getLocation(String tableName, String partValue) {
+ String location = tmpdir.getAbsolutePath() +
+ System.getProperty("file.separator") + tableName;
+ if (partValue != null) {
+ location += System.getProperty("file.separator") + "ds=" + partValue;
+ }
+ return location;
+ }
+
+ private enum FileType {BASE, DELTA, LEGACY};
+
+ private void addFile(HiveConf conf, Table t, Partition p, long minTxn, long maxTxn,
+ int numRecords, FileType type) throws Exception {
+ String partValue = (p == null) ? null : p.getValues().get(0);
+ Path location = new Path(getLocation(t.getTableName(), partValue));
+ String filename = null;
+ switch (type) {
+ case BASE: filename = "base_" + maxTxn; break;
+ case DELTA: filename = "delta_" + minTxn + "_" + maxTxn; break;
+ case LEGACY: filename = "00000_0";
+ }
+
+ FileSystem fs = FileSystem.get(conf);
+ Path partFile = null;
+ if (type == FileType.LEGACY) {
+ partFile = new Path(location, filename);
+ } else {
+ Path dir = new Path(location, filename);
+ fs.mkdirs(dir);
+ partFile = new Path(dir, "bucket_00000");
+ }
+ FSDataOutputStream out = fs.create(partFile);
+ for (int i = 0; i < numRecords; i++) {
+ out.writeBytes("mary had a little lamb its fleece was white as snow\n");
+ }
+ out.close();
+ }
+
+ static class MockInputFormat implements AcidInputFormat {
+
+ @Override
+ public RowReader getReader(InputSplit split,
+ Options options) throws
+ IOException {
+ return null;
+ }
+
+ @Override
+ public RawReader getRawReader(Configuration conf, boolean collapseEvents, int bucket,
+ Path baseDirectory, Path... deltaDirectory) throws IOException {
+
+ List filesToRead = new ArrayList();
+ if (baseDirectory != null) {
+ if (baseDirectory.getName().startsWith(AcidUtils.BASE_PREFIX)) {
+ filesToRead.add(AcidUtils.createBucketFile(baseDirectory, bucket));
+ } else {
+ filesToRead.add(new Path(baseDirectory, "00000_0"));
+
+ }
+ }
+ for (int i = 0; i < deltaDirectory.length; i++) {
+ filesToRead.add(AcidUtils.createBucketFile(deltaDirectory[i], bucket));
+ }
+ return new MockRawReader(conf, filesToRead);
+ }
+
+ @Override
+ public InputSplit[] getSplits(JobConf entries, int i) throws IOException {
+ return new InputSplit[0];
+ }
+
+ @Override
+ public RecordReader getRecordReader(InputSplit inputSplit, JobConf entries,
+ Reporter reporter) throws IOException {
+ return null;
+ }
+
+ @Override
+ public boolean validateInput(FileSystem fs, HiveConf conf, ArrayList files) throws
+ IOException {
+ return false;
+ }
+ }
+
+ static class MockRawReader implements AcidInputFormat.RawReader {
+ private Stack filesToRead;
+ private Configuration conf;
+ private FSDataInputStream is = null;
+ private FileSystem fs;
+
+ MockRawReader(Configuration conf, List files) throws IOException {
+ filesToRead = new Stack();
+ for (Path file : files) filesToRead.push(file);
+ this.conf = conf;
+ fs = FileSystem.get(conf);
+ }
+
+ @Override
+ public ObjectInspector getObjectInspector() {
+ return null;
+ }
+
+ @Override
+ public boolean next(RecordIdentifier identifier, Text text) throws IOException {
+ if (is == null) {
+ // Open the next file
+ if (filesToRead.empty()) return false;
+ Path p = filesToRead.pop();
+ LOG.debug("Reading records from " + p.toString());
+ is = fs.open(p);
+ }
+ String line = is.readLine();
+ if (line == null) {
+ // Set our current entry to null (since it's done) and try again.
+ is = null;
+ return next(identifier, text);
+ }
+ text.set(line);
+ return true;
+ }
+
+ @Override
+ public RecordIdentifier createKey() {
+ return new RecordIdentifier();
+ }
+
+ @Override
+ public Text createValue() {
+ return new Text();
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return 0;
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ return 0;
+ }
+ }
+
+ static class MockOutputFormat implements AcidOutputFormat {
+
+ @Override
+ public RecordUpdater getRecordUpdater(Path path, Options options) throws
+ IOException {
+ return null;
+ }
+
+ @Override
+ public FSRecordWriter getRawRecordWriter(Path path, Options options) throws IOException {
+ return new MockRecordWriter(path, options);
+ }
+
+ @Override
+ public FSRecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
+ Class extends Writable> valueClass,
+ boolean isCompressed, Properties tableProperties,
+ Progressable progress) throws IOException {
+ return null;
+ }
+
+ @Override
+ public RecordWriter getRecordWriter(FileSystem fileSystem, JobConf entries,
+ String s,
+ Progressable progressable) throws
+ IOException {
+ return null;
+ }
+
+ @Override
+ public void checkOutputSpecs(FileSystem fileSystem, JobConf entries) throws IOException {
+
+ }
+ }
+
+ static class MockRecordWriter implements FSRecordWriter {
+ private FSDataOutputStream os;
+
+ MockRecordWriter(Path basedir, AcidOutputFormat.Options options) throws IOException {
+ FileSystem fs = FileSystem.get(options.getConfiguration());
+ os = fs.create(basedir);
+ }
+
+ @Override
+ public void write(Writable w) throws IOException {
+ Text t = (Text)w;
+ os.writeBytes(t.toString());
+ os.writeBytes("\n");
+ }
+
+ @Override
+ public void close(boolean abort) throws IOException {
+ os.close();
+ }
+ }
+
+
+}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
new file mode 100644
index 0000000..b55805a
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import junit.framework.Assert;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests for the compactor Cleaner thread
+ */
+public class TestCleaner extends CompactorTest {
+ public TestCleaner() throws Exception {
+ super();
+ }
+
+ @Test
+ public void nothing() throws Exception {
+ // Test that the whole things works when there's nothing in the queue. This is just a
+ // survival test.
+ startCleaner(new HiveConf());
+ Thread.sleep(sleepTime); // should be long enough to get through the loop
+ stopThreads();
+ }
+
+ @Test
+ public void cleanupAfterMajorTableCompaction() throws Exception {
+ Table t = newTable("default", "camtc", false);
+
+ HiveConf conf = new HiveConf();
+
+ addBaseFile(conf, t, null, 20L, 20);
+ addDeltaFile(conf, t, null, 21L, 22L, 2);
+ addDeltaFile(conf, t, null, 23L, 24L, 2);
+ addBaseFile(conf, t, null, 25L, 25);
+
+ burnThroughTransactions(25);
+
+ CompactionRequest rqst = new CompactionRequest("default", "camtc", CompactionType.MAJOR);
+ txnHandler.compact(rqst);
+ CompactionInfo ci = txnHandler.findNextToCompact("fred");
+ txnHandler.markCompacted(ci);
+ txnHandler.setRunAs(ci.id, System.getProperty("user.name"));
+
+ startCleaner(conf);
+ Thread.sleep(sleepTime); // should be long enough to get through the loop
+
+ // Check there are no compactions requests left.
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ Assert.assertNull(rsp.getCompacts());
+
+ // Check that the files are removed
+ List paths = getDirectories(conf, t, null);
+ Assert.assertEquals(1, paths.size());
+ Assert.assertEquals("base_25", paths.get(0).getName());
+ stopThreads();
+ }
+
+ @Test
+ public void cleanupAfterMajorPartitionCompaction() throws Exception {
+ Table t = newTable("default", "campc", true);
+ Partition p = newPartition(t, "today");
+
+ HiveConf conf = new HiveConf();
+
+ addBaseFile(conf, t, p, 20L, 20);
+ addDeltaFile(conf, t, p, 21L, 22L, 2);
+ addDeltaFile(conf, t, p, 23L, 24L, 2);
+ addBaseFile(conf, t, p, 25L, 25);
+
+ burnThroughTransactions(25);
+
+ CompactionRequest rqst = new CompactionRequest("default", "campc", CompactionType.MAJOR);
+ rqst.setPartitionname("ds=today");
+ txnHandler.compact(rqst);
+ CompactionInfo ci = txnHandler.findNextToCompact("fred");
+ txnHandler.markCompacted(ci);
+ txnHandler.setRunAs(ci.id, System.getProperty("user.name"));
+
+ startCleaner(conf);
+ Thread.sleep(sleepTime); // should be long enough to get through the loop
+
+ // Check there are no compactions requests left.
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ Assert.assertNull(rsp.getCompacts());
+
+ // Check that the files are removed
+ List paths = getDirectories(conf, t, p);
+ Assert.assertEquals(1, paths.size());
+ Assert.assertEquals("base_25", paths.get(0).getName());
+ stopThreads();
+ }
+
+ @Test
+ public void cleanupAfterMinorTableCompaction() throws Exception {
+ Table t = newTable("default", "camitc", false);
+
+ HiveConf conf = new HiveConf();
+
+ addBaseFile(conf, t, null, 20L, 20);
+ addDeltaFile(conf, t, null, 21L, 22L, 2);
+ addDeltaFile(conf, t, null, 23L, 24L, 2);
+ addDeltaFile(conf, t, null, 21L, 24L, 4);
+
+ burnThroughTransactions(25);
+
+ CompactionRequest rqst = new CompactionRequest("default", "camitc", CompactionType.MINOR);
+ txnHandler.compact(rqst);
+ CompactionInfo ci = txnHandler.findNextToCompact("fred");
+ txnHandler.markCompacted(ci);
+ txnHandler.setRunAs(ci.id, System.getProperty("user.name"));
+
+ startCleaner(conf);
+ Thread.sleep(sleepTime); // should be long enough to get through the loop
+
+ // Check there are no compactions requests left.
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ Assert.assertNull(rsp.getCompacts());
+
+ // Check that the files are removed
+ List paths = getDirectories(conf, t, null);
+ Assert.assertEquals(2, paths.size());
+ boolean sawBase = false, sawDelta = false;
+ for (Path p : paths) {
+ if (p.getName().equals("base_20")) sawBase = true;
+ else if (p.getName().equals("delta_21_24")) sawDelta = true;
+ else Assert.fail("Unexpected file " + p.getName());
+ }
+ Assert.assertTrue(sawBase);
+ Assert.assertTrue(sawDelta);
+ stopThreads();
+ }
+
+ @Test
+ public void cleanupAfterMinorPartitionCompaction() throws Exception {
+ Table t = newTable("default", "camipc", true);
+ Partition p = newPartition(t, "today");
+
+ HiveConf conf = new HiveConf();
+
+ addBaseFile(conf, t, p, 20L, 20);
+ addDeltaFile(conf, t, p, 21L, 22L, 2);
+ addDeltaFile(conf, t, p, 23L, 24L, 2);
+ addDeltaFile(conf, t, p, 21L, 24L, 4);
+
+ burnThroughTransactions(25);
+
+ CompactionRequest rqst = new CompactionRequest("default", "camipc", CompactionType.MINOR);
+ rqst.setPartitionname("ds=today");
+ txnHandler.compact(rqst);
+ CompactionInfo ci = txnHandler.findNextToCompact("fred");
+ txnHandler.markCompacted(ci);
+ txnHandler.setRunAs(ci.id, System.getProperty("user.name"));
+
+ startCleaner(conf);
+ Thread.sleep(sleepTime); // should be long enough to get through the loop
+
+ // Check there are no compactions requests left.
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ Assert.assertNull(rsp.getCompacts());
+
+ // Check that the files are removed
+ List paths = getDirectories(conf, t, p);
+ Assert.assertEquals(2, paths.size());
+ boolean sawBase = false, sawDelta = false;
+ for (Path path : paths) {
+ if (path.getName().equals("base_20")) sawBase = true;
+ else if (path.getName().equals("delta_21_24")) sawDelta = true;
+ else Assert.fail("Unexpected file " + path.getName());
+ }
+ Assert.assertTrue(sawBase);
+ Assert.assertTrue(sawDelta);
+ stopThreads();
+ }
+
+ @Test
+ public void blockedByLockTable() throws Exception {
+ Table t = newTable("default", "bblt", false);
+
+ HiveConf conf = new HiveConf();
+
+ addBaseFile(conf, t, null, 20L, 20);
+ addDeltaFile(conf, t, null, 21L, 22L, 2);
+ addDeltaFile(conf, t, null, 23L, 24L, 2);
+ addDeltaFile(conf, t, null, 21L, 24L, 4);
+
+ burnThroughTransactions(25);
+
+ CompactionRequest rqst = new CompactionRequest("default", "bblt", CompactionType.MINOR);
+ txnHandler.compact(rqst);
+ CompactionInfo ci = txnHandler.findNextToCompact("fred");
+ txnHandler.markCompacted(ci);
+ txnHandler.setRunAs(ci.id, System.getProperty("user.name"));
+
+ LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE,
+ "default", "bblt", null);
+ List components = new ArrayList(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ LockResponse res = txnHandler.lock(req);
+
+ startCleaner(conf);
+ Thread.sleep(sleepTime); // should be long enough to get through the loop
+
+ // Check there are no compactions requests left.
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List compacts = rsp.getCompacts();
+ Assert.assertEquals(1, compacts.size());
+ Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
+ Assert.assertEquals("bblt", compacts.get(0).getTablename());
+ Assert.assertEquals(CompactionType.MINOR, compacts.get(0).getType());
+ stopThreads();
+ }
+
+ @Test
+ public void blockedByLockPartition() throws Exception {
+ Table t = newTable("default", "bblp", true);
+ Partition p = newPartition(t, "today");
+
+ HiveConf conf = new HiveConf();
+
+ addBaseFile(conf, t, p, 20L, 20);
+ addDeltaFile(conf, t, p, 21L, 22L, 2);
+ addDeltaFile(conf, t, p, 23L, 24L, 2);
+ addDeltaFile(conf, t, p, 21L, 24L, 4);
+
+ burnThroughTransactions(25);
+
+ CompactionRequest rqst = new CompactionRequest("default", "bblp", CompactionType.MINOR);
+ rqst.setPartitionname("ds=today");
+ txnHandler.compact(rqst);
+ CompactionInfo ci = txnHandler.findNextToCompact("fred");
+ txnHandler.markCompacted(ci);
+ txnHandler.setRunAs(ci.id, System.getProperty("user.name"));
+
+ LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE,
+ "default", "bblp", "ds=today");
+ List components = new ArrayList(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ LockResponse res = txnHandler.lock(req);
+
+ startCleaner(conf);
+ Thread.sleep(sleepTime); // should be long enough to get through the loop
+
+ // Check there are no compactions requests left.
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List compacts = rsp.getCompacts();
+ Assert.assertEquals(1, compacts.size());
+ Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
+ Assert.assertEquals("bblp", compacts.get(0).getTablename());
+ Assert.assertEquals("ds=today", compacts.get(0).getPartitionname());
+ Assert.assertEquals(CompactionType.MINOR, compacts.get(0).getType());
+ stopThreads();
+ }
+
+ @Before
+ public void setUpTxnDb() throws Exception {
+ TxnDbUtil.setConfValues(new HiveConf());
+ TxnDbUtil.prepDb();
+ }
+
+ @After
+ public void tearDownTxnDb() throws Exception {
+ TxnDbUtil.cleanDb();
+ }
+}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
new file mode 100644
index 0000000..e85e762
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
@@ -0,0 +1,582 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import junit.framework.Assert;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Tests for the compactor Initiator thread.
+ */
+public class TestInitiator extends CompactorTest {
+ static final private String CLASS_NAME = TestInitiator.class.getName();
+ static final private Log LOG = LogFactory.getLog(CLASS_NAME);
+
+ public TestInitiator() throws Exception {
+ super();
+ }
+
+ @Test
+ public void nothing() throws Exception {
+ // Test that the whole things works when there's nothing in the queue. This is just a
+ // survival test.
+ startInitiator(new HiveConf());
+ Thread.sleep(sleepTime); // should be long enough to get through the loop
+ stopThreads();
+ }
+
+ @Test
+ public void recoverFailedLocalWorkers() throws Exception {
+ Table t = newTable("default", "rflw1", false);
+ CompactionRequest rqst = new CompactionRequest("default", "rflw1", CompactionType.MINOR);
+ txnHandler.compact(rqst);
+
+ t = newTable("default", "rflw2", false);
+ rqst = new CompactionRequest("default", "rflw2", CompactionType.MINOR);
+ txnHandler.compact(rqst);
+
+ txnHandler.findNextToCompact(Worker.hostname() + "-193892");
+ txnHandler.findNextToCompact("nosuchhost-193892");
+
+ startInitiator(new HiveConf());
+ Thread.sleep(sleepTime); // should be long enough to get through the loop
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List compacts = rsp.getCompacts();
+ Assert.assertEquals(2, compacts.size());
+ boolean sawInitiated = false;
+ for (ShowCompactResponseElement c : compacts) {
+ if (c.getState().equals("working")) {
+ Assert.assertEquals("nosuchhost-193892", c.getWorkerid());
+ } else if (c.getState().equals("initiated")) {
+ sawInitiated = true;
+ } else {
+ Assert.fail("Unexpected state");
+ }
+ }
+ Assert.assertTrue(sawInitiated);
+ stopThreads();
+ }
+
+ @Test
+ public void recoverFailedRemoteWorkers() throws Exception {
+ Table t = newTable("default", "rfrw1", false);
+ CompactionRequest rqst = new CompactionRequest("default", "rfrw1", CompactionType.MINOR);
+ txnHandler.compact(rqst);
+
+ txnHandler.findNextToCompact("nosuchhost-193892");
+
+ HiveConf conf = new HiveConf();
+ HiveConf.setLongVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT, 1L);
+ Thread.sleep(sleepTime); // should be long enough to get through the loop
+
+ startInitiator(conf);
+ Thread.sleep(sleepTime); // should be long enough to get through the loop
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List compacts = rsp.getCompacts();
+ Assert.assertEquals(1, compacts.size());
+ Assert.assertEquals("initiated", compacts.get(0).getState());
+ stopThreads();
+ }
+
+ @Test
+ public void majorCompactOnTableTooManyAborts() throws Exception {
+ Table t = newTable("default", "mcottma", false);
+
+ HiveConf conf = new HiveConf();
+ HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 10);
+
+ for (int i = 0; i < 11; i++) {
+ long txnid = openTxn();
+ LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE,
+ "default", "mcottma", null);
+ List components = new ArrayList(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(txnid);
+ LockResponse res = txnHandler.lock(req);
+ txnHandler.abortTxn(new AbortTxnRequest(txnid));
+ }
+
+ startInitiator(conf);
+ Thread.sleep(sleepTime); // should be long enough to get through the loop
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List compacts = rsp.getCompacts();
+ Assert.assertEquals(1, compacts.size());
+ Assert.assertEquals("initiated", compacts.get(0).getState());
+ Assert.assertEquals("mcottma", compacts.get(0).getTablename());
+ Assert.assertEquals(CompactionType.MAJOR, compacts.get(0).getType());
+ stopThreads();
+ }
+
+ @Test
+ public void majorCompactOnPartitionTooManyAborts() throws Exception {
+ Table t = newTable("default", "mcoptma", true);
+ Partition p = newPartition(t, "today");
+
+ HiveConf conf = new HiveConf();
+ HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 10);
+
+ for (int i = 0; i < 11; i++) {
+ long txnid = openTxn();
+ LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE,
+ "default", "mcoptma", "ds=today");
+ List components = new ArrayList(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(txnid);
+ LockResponse res = txnHandler.lock(req);
+ txnHandler.abortTxn(new AbortTxnRequest(txnid));
+ }
+
+ startInitiator(conf);
+ Thread.sleep(sleepTime); // should be long enough to get through the loop
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List compacts = rsp.getCompacts();
+ Assert.assertEquals(1, compacts.size());
+ Assert.assertEquals("initiated", compacts.get(0).getState());
+ Assert.assertEquals("mcoptma", compacts.get(0).getTablename());
+ Assert.assertEquals("ds=today", compacts.get(0).getPartitionname());
+ Assert.assertEquals(CompactionType.MAJOR, compacts.get(0).getType());
+ stopThreads();
+ }
+
+ @Test
+ public void noCompactOnManyDifferentPartitionAborts() throws Exception {
+ Table t = newTable("default", "ncomdpa", true);
+ for (int i = 0; i < 11; i++) {
+ Partition p = newPartition(t, "day-" + i);
+ }
+
+ HiveConf conf = new HiveConf();
+ HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 10);
+
+ for (int i = 0; i < 11; i++) {
+ long txnid = openTxn();
+ LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE,
+ "default", "ncomdpa", "ds=day-" + i);
+ List components = new ArrayList(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(txnid);
+ LockResponse res = txnHandler.lock(req);
+ txnHandler.abortTxn(new AbortTxnRequest(txnid));
+ }
+
+ startInitiator(conf);
+ Thread.sleep(sleepTime); // should be long enough to get through the loop
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ Assert.assertNull(rsp.getCompacts());
+ stopThreads();
+ }
+
+ @Test
+ public void cleanEmptyAbortedTxns() throws Exception {
+ // Test that we are cleaning aborted transactions with no components left in txn_components.
+ // Put one aborted transaction with an entry in txn_components to make sure we don't
+ // accidently clean it too.
+ Table t = newTable("default", "ceat", false);
+
+ HiveConf conf = new HiveConf();
+
+ long txnid = openTxn();
+ LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE,
+ "default", "ceat", null);
+ List components = new ArrayList(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(txnid);
+ LockResponse res = txnHandler.lock(req);
+ txnHandler.abortTxn(new AbortTxnRequest(txnid));
+
+ for (int i = 0; i < 100; i++) {
+ txnid = openTxn();
+ txnHandler.abortTxn(new AbortTxnRequest(txnid));
+ }
+ GetOpenTxnsResponse openTxns = txnHandler.getOpenTxns();
+ Assert.assertEquals(101, openTxns.getOpen_txnsSize());
+
+ startInitiator(conf);
+ Thread.sleep(sleepTime); // should be long enough to get through the loop
+
+ openTxns = txnHandler.getOpenTxns();
+ Assert.assertEquals(1, openTxns.getOpen_txnsSize());
+ stopThreads();
+ }
+
+ @Test
+ public void noCompactWhenNoCompactSet() throws Exception {
+ Map parameters = new HashMap(1);
+ parameters.put("NO_AUTO_COMPACTION", "true");
+ Table t = newTable("default", "ncwncs", false, parameters);
+
+ HiveConf conf = new HiveConf();
+ HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 10);
+
+ for (int i = 0; i < 11; i++) {
+ long txnid = openTxn();
+ LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE,
+ "default", "ncwncs", null);
+ List components = new ArrayList(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(txnid);
+ LockResponse res = txnHandler.lock(req);
+ txnHandler.abortTxn(new AbortTxnRequest(txnid));
+ }
+
+ startInitiator(conf);
+ Thread.sleep(sleepTime); // should be long enough to get through the loop
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ Assert.assertNull(rsp.getCompacts());
+ stopThreads();
+ }
+
+ @Test
+ public void noCompactWhenCompactAlreadyScheduled() throws Exception {
+ Table t = newTable("default", "ncwcas", false);
+
+ HiveConf conf = new HiveConf();
+ HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 10);
+
+ for (int i = 0; i < 11; i++) {
+ long txnid = openTxn();
+ LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE,
+ "default", "ncwcas", null);
+ List components = new ArrayList(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(txnid);
+ LockResponse res = txnHandler.lock(req);
+ txnHandler.abortTxn(new AbortTxnRequest(txnid));
+ }
+
+ CompactionRequest rqst = new CompactionRequest("default", "ncwcas", CompactionType.MAJOR);
+ txnHandler.compact(rqst);
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List compacts = rsp.getCompacts();
+ Assert.assertEquals(1, compacts.size());
+ Assert.assertEquals("initiated", compacts.get(0).getState());
+ Assert.assertEquals("ncwcas", compacts.get(0).getTablename());
+
+ startInitiator(conf);
+ Thread.sleep(sleepTime); // should be long enough to get through the loop
+
+ rsp = txnHandler.showCompact(new ShowCompactRequest());
+ compacts = rsp.getCompacts();
+ Assert.assertEquals(1, compacts.size());
+ Assert.assertEquals("initiated", compacts.get(0).getState());
+ Assert.assertEquals("ncwcas", compacts.get(0).getTablename());
+ Assert.assertEquals(CompactionType.MAJOR, compacts.get(0).getType());
+ stopThreads();
+ }
+
+ @Test
+ public void compactTableHighDeltaPct() throws Exception {
+ Table t = newTable("default", "cthdp", false);
+
+ HiveConf conf = new HiveConf();
+
+ addBaseFile(conf, t, null, 20L, 20);
+ addDeltaFile(conf, t, null, 21L, 22L, 2);
+ addDeltaFile(conf, t, null, 23L, 24L, 2);
+
+ burnThroughTransactions(23);
+
+ long txnid = openTxn();
+ LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE,
+ "default", "cthdp", null);
+ List components = new ArrayList(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(txnid);
+ LockResponse res = txnHandler.lock(req);
+ txnHandler.commitTxn(new CommitTxnRequest(txnid));
+
+ startInitiator(conf);
+ Thread.sleep(sleepTime); // should be long enough to get through the loop
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List compacts = rsp.getCompacts();
+ Assert.assertEquals(1, compacts.size());
+ Assert.assertEquals("initiated", compacts.get(0).getState());
+ Assert.assertEquals("cthdp", compacts.get(0).getTablename());
+ Assert.assertEquals(CompactionType.MAJOR, compacts.get(0).getType());
+ stopThreads();
+ }
+
+ @Test
+ public void compactPartitionHighDeltaPct() throws Exception {
+ Table t = newTable("default", "cphdp", true);
+ Partition p = newPartition(t, "today");
+
+ HiveConf conf = new HiveConf();
+
+ addBaseFile(conf, t, p, 20L, 20);
+ addDeltaFile(conf, t, p, 21L, 22L, 2);
+ addDeltaFile(conf, t, p, 23L, 24L, 2);
+
+ burnThroughTransactions(23);
+
+ long txnid = openTxn();
+ LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION,
+ "default", "cphdp", "ds=today");
+ List components = new ArrayList(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(txnid);
+ LockResponse res = txnHandler.lock(req);
+ txnHandler.commitTxn(new CommitTxnRequest(txnid));
+
+ startInitiator(conf);
+ Thread.sleep(sleepTime); // should be long enough to get through the loop
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List compacts = rsp.getCompacts();
+ Assert.assertEquals(1, compacts.size());
+ Assert.assertEquals("initiated", compacts.get(0).getState());
+ Assert.assertEquals("cphdp", compacts.get(0).getTablename());
+ Assert.assertEquals("ds=today", compacts.get(0).getPartitionname());
+ Assert.assertEquals(CompactionType.MAJOR, compacts.get(0).getType());
+ stopThreads();
+ }
+
+ @Test
+ public void noCompactTableDeltaPctNotHighEnough() throws Exception {
+ Table t = newTable("default", "nctdpnhe", false);
+
+ HiveConf conf = new HiveConf();
+
+ addBaseFile(conf, t, null, 50L, 50);
+ addDeltaFile(conf, t, null, 21L, 22L, 2);
+ addDeltaFile(conf, t, null, 23L, 24L, 2);
+
+ burnThroughTransactions(53);
+
+ long txnid = openTxn();
+ LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE,
+ "default", "nctdpnhe", null);
+ List components = new ArrayList(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(txnid);
+ LockResponse res = txnHandler.lock(req);
+ txnHandler.commitTxn(new CommitTxnRequest(txnid));
+
+ startInitiator(conf);
+ Thread.sleep(sleepTime); // should be long enough to get through the loop
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ Assert.assertNull(rsp.getCompacts());
+ stopThreads();
+ }
+
+ @Test
+ public void compactTableTooManyDeltas() throws Exception {
+ Table t = newTable("default", "cttmd", false);
+
+ HiveConf conf = new HiveConf();
+
+ addBaseFile(conf, t, null, 200L, 200);
+ addDeltaFile(conf, t, null, 201L, 201L, 1);
+ addDeltaFile(conf, t, null, 202L, 202L, 1);
+ addDeltaFile(conf, t, null, 203L, 203L, 1);
+ addDeltaFile(conf, t, null, 204L, 204L, 1);
+ addDeltaFile(conf, t, null, 205L, 205L, 1);
+ addDeltaFile(conf, t, null, 206L, 206L, 1);
+ addDeltaFile(conf, t, null, 207L, 207L, 1);
+ addDeltaFile(conf, t, null, 208L, 208L, 1);
+ addDeltaFile(conf, t, null, 209L, 209L, 1);
+ addDeltaFile(conf, t, null, 210L, 210L, 1);
+ addDeltaFile(conf, t, null, 211L, 211L, 1);
+
+ burnThroughTransactions(210);
+
+ long txnid = openTxn();
+ LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE,
+ "default", "cttmd", null);
+ List components = new ArrayList(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(txnid);
+ LockResponse res = txnHandler.lock(req);
+ txnHandler.commitTxn(new CommitTxnRequest(txnid));
+
+ startInitiator(conf);
+ Thread.sleep(sleepTime); // should be long enough to get through the loop
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List compacts = rsp.getCompacts();
+ Assert.assertEquals(1, compacts.size());
+ Assert.assertEquals("initiated", compacts.get(0).getState());
+ Assert.assertEquals("cttmd", compacts.get(0).getTablename());
+ Assert.assertEquals(CompactionType.MINOR, compacts.get(0).getType());
+ stopThreads();
+ }
+
+ @Test
+ public void compactPartitionTooManyDeltas() throws Exception {
+ Table t = newTable("default", "cptmd", true);
+ Partition p = newPartition(t, "today");
+
+ HiveConf conf = new HiveConf();
+
+ addBaseFile(conf, t, p, 200L, 200);
+ addDeltaFile(conf, t, p, 201L, 201L, 1);
+ addDeltaFile(conf, t, p, 202L, 202L, 1);
+ addDeltaFile(conf, t, p, 203L, 203L, 1);
+ addDeltaFile(conf, t, p, 204L, 204L, 1);
+ addDeltaFile(conf, t, p, 205L, 205L, 1);
+ addDeltaFile(conf, t, p, 206L, 206L, 1);
+ addDeltaFile(conf, t, p, 207L, 207L, 1);
+ addDeltaFile(conf, t, p, 208L, 208L, 1);
+ addDeltaFile(conf, t, p, 209L, 209L, 1);
+ addDeltaFile(conf, t, p, 210L, 210L, 1);
+ addDeltaFile(conf, t, p, 211L, 211L, 1);
+
+ burnThroughTransactions(210);
+
+ long txnid = openTxn();
+ LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION,
+ "default", "cptmd", "ds=today");
+ List components = new ArrayList(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(txnid);
+ LockResponse res = txnHandler.lock(req);
+ txnHandler.commitTxn(new CommitTxnRequest(txnid));
+
+ startInitiator(conf);
+ Thread.sleep(sleepTime); // should be long enough to get through the loop
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List compacts = rsp.getCompacts();
+ Assert.assertEquals(1, compacts.size());
+ Assert.assertEquals("initiated", compacts.get(0).getState());
+ Assert.assertEquals("cptmd", compacts.get(0).getTablename());
+ Assert.assertEquals("ds=today", compacts.get(0).getPartitionname());
+ Assert.assertEquals(CompactionType.MINOR, compacts.get(0).getType());
+ stopThreads();
+ }
+
+ @Test
+ public void noCompactTableNotEnoughDeltas() throws Exception {
+ Table t = newTable("default", "nctned", false);
+
+ HiveConf conf = new HiveConf();
+
+ addBaseFile(conf, t, null, 200L, 200);
+ addDeltaFile(conf, t, null, 201L, 205L, 5);
+ addDeltaFile(conf, t, null, 206L, 211L, 6);
+
+ burnThroughTransactions(210);
+
+ long txnid = openTxn();
+ LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE,
+ "default", "nctned", null);
+ List components = new ArrayList(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(txnid);
+ LockResponse res = txnHandler.lock(req);
+ txnHandler.commitTxn(new CommitTxnRequest(txnid));
+
+ startInitiator(conf);
+ Thread.sleep(sleepTime); // should be long enough to get through the loop
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ Assert.assertNull(rsp.getCompacts());
+ stopThreads();
+ }
+
+ @Test
+ public void chooseMajorOverMinorWhenBothValid() throws Exception {
+ Table t = newTable("default", "cmomwbv", false);
+
+ HiveConf conf = new HiveConf();
+
+ addBaseFile(conf, t, null, 200L, 200);
+ addDeltaFile(conf, t, null, 201L, 211L, 11);
+ addDeltaFile(conf, t, null, 212L, 222L, 11);
+ addDeltaFile(conf, t, null, 223L, 233L, 11);
+ addDeltaFile(conf, t, null, 234L, 244L, 11);
+ addDeltaFile(conf, t, null, 245L, 255L, 11);
+ addDeltaFile(conf, t, null, 256L, 266L, 11);
+ addDeltaFile(conf, t, null, 267L, 277L, 11);
+ addDeltaFile(conf, t, null, 278L, 288L, 11);
+ addDeltaFile(conf, t, null, 289L, 299L, 11);
+ addDeltaFile(conf, t, null, 300L, 310L, 11);
+ addDeltaFile(conf, t, null, 311L, 321L, 11);
+
+ burnThroughTransactions(320);
+
+ long txnid = openTxn();
+ LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE,
+ "default", "cmomwbv", null);
+ List components = new ArrayList(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(txnid);
+ LockResponse res = txnHandler.lock(req);
+ txnHandler.commitTxn(new CommitTxnRequest(txnid));
+
+ startInitiator(conf);
+ Thread.sleep(sleepTime); // should be long enough to get through the loop
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List compacts = rsp.getCompacts();
+ Assert.assertEquals(1, compacts.size());
+ Assert.assertEquals("initiated", compacts.get(0).getState());
+ Assert.assertEquals("cmomwbv", compacts.get(0).getTablename());
+ Assert.assertEquals(CompactionType.MAJOR, compacts.get(0).getType());
+ stopThreads();
+ }
+
+ // TODO test compactions with legacy file types
+
+ @Before
+ public void setUpTxnDb() throws Exception {
+ TxnDbUtil.setConfValues(new HiveConf());
+ TxnDbUtil.prepDb();
+ }
+
+ @After
+ public void tearDownTxnDb() throws Exception {
+ TxnDbUtil.cleanDb();
+ }
+
+}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
new file mode 100644
index 0000000..3c21a5d
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
@@ -0,0 +1,599 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import junit.framework.Assert;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Tests for the worker thread and its MR jobs.
+ */
+public class TestWorker extends CompactorTest {
+ static final private String CLASS_NAME = TestWorker.class.getName();
+ static final private Log LOG = LogFactory.getLog(CLASS_NAME);
+
+ public TestWorker() throws Exception {
+ super();
+ }
+
+ @Test
+ public void nothing() throws Exception {
+ // Test that the whole things works when there's nothing in the queue. This is just a
+ // survival test.
+ startWorker(new HiveConf());
+ Thread.sleep(sleepTime); // should be long enough to get through the loop
+ stopThreads();
+ }
+
+ @Test
+ public void stringableMap() throws Exception {
+ // Empty map case
+ CompactorMR.StringableMap m = new CompactorMR.StringableMap(new HashMap());
+ String s = m.toString();
+ Assert.assertEquals("0:", s);
+ m = new CompactorMR.StringableMap(s);
+ Assert.assertEquals(0, m.size());
+
+ Map base = new HashMap();
+ base.put("mary", "poppins");
+ base.put("bert", null);
+ base.put(null, "banks");
+ m = new CompactorMR.StringableMap(base);
+ s = m.toString();
+ m = new CompactorMR.StringableMap(s);
+ Assert.assertEquals(3, m.size());
+ Map saw = new HashMap(3);
+ saw.put("mary", false);
+ saw.put("bert", false);
+ saw.put(null, false);
+ for (Map.Entry e : m.entrySet()) {
+ saw.put(e.getKey(), true);
+ if ("mary".equals(e.getKey())) Assert.assertEquals("poppins", e.getValue());
+ else if ("bert".equals(e.getKey())) Assert.assertNull(e.getValue());
+ else if (null == e.getKey()) Assert.assertEquals("banks", e.getValue());
+ else Assert.fail("Unexpected value " + e.getKey());
+ }
+ Assert.assertEquals(3, saw.size());
+ Assert.assertTrue(saw.get("mary"));
+ Assert.assertTrue(saw.get("bert"));
+ Assert.assertTrue(saw.get(null));
+ }
+
+ @Test
+ public void stringableList() throws Exception {
+ // Empty list case
+ CompactorMR.StringableList ls = new CompactorMR.StringableList();
+ String s = ls.toString();
+ Assert.assertEquals("0:", s);
+ ls = new CompactorMR.StringableList(s);
+ Assert.assertEquals(0, ls.size());
+
+ ls = new CompactorMR.StringableList();
+ ls.add(new Path("/tmp"));
+ ls.add(new Path("/usr"));
+ s = ls.toString();
+ Assert.assertTrue("Expected 2:4:/tmp4:/usr or 2:4:/usr4:/tmp, got " + s,
+ "2:4:/tmp4:/usr".equals(s) || "2:4:/usr4:/tmp".equals(s));
+ ls = new CompactorMR.StringableList(s);
+ Assert.assertEquals(2, ls.size());
+ boolean sawTmp = false, sawUsr = false;
+ for (Path p : ls) {
+ if ("/tmp".equals(p.toString())) sawTmp = true;
+ else if ("/usr".equals(p.toString())) sawUsr = true;
+ else Assert.fail("Unexpected path " + p.toString());
+ }
+ Assert.assertTrue(sawTmp);
+ Assert.assertTrue(sawUsr);
+ }
+
+ @Test
+ public void inputSplit() throws Exception {
+ String basename = "/warehouse/foo/base_1";
+ String delta1 = "/warehouse/foo/delta_2_3";
+ String delta2 = "/warehouse/foo/delta_4_7";
+
+ HiveConf conf = new HiveConf();
+ Path file = new Path(System.getProperty("java.io.tmpdir") +
+ System.getProperty("file.separator") + "newWriteInputSplitTest");
+ FileSystem fs = FileSystem.get(conf);
+ FSDataOutputStream os = fs.create(file);
+ for (int i = 0; i < 10; i++) {
+ os.writeBytes("mary had a little lamb its fleece was white as snow\n");
+ }
+ os.close();
+ List files = new ArrayList(1);
+ files.add(file);
+
+ Path[] deltas = new Path[2];
+ deltas[0] = new Path(delta1);
+ deltas[1] = new Path(delta2);
+
+ CompactorMR.CompactorInputSplit split =
+ new CompactorMR.CompactorInputSplit(conf, 3, files, new Path(basename), deltas);
+
+ Assert.assertEquals(520L, split.getLength());
+ String[] locations = split.getLocations();
+ Assert.assertEquals(1, locations.length);
+ Assert.assertEquals("localhost", locations[0]);
+
+ ByteArrayOutputStream buf = new ByteArrayOutputStream();
+ DataOutput out = new DataOutputStream(buf);
+ split.write(out);
+
+ split = new CompactorMR.CompactorInputSplit();
+ DataInput in = new DataInputStream(new ByteArrayInputStream(buf.toByteArray()));
+ split.readFields(in);
+
+ Assert.assertEquals(3, split.getBucket());
+ Assert.assertEquals(basename, split.getBaseDir().toString());
+ deltas = split.getDeltaDirs();
+ Assert.assertEquals(2, deltas.length);
+ Assert.assertEquals(delta1, deltas[0].toString());
+ Assert.assertEquals(delta2, deltas[1].toString());
+ }
+
+ @Test
+ public void inputSplitNullBase() throws Exception {
+ String delta1 = "/warehouse/foo/delta_2_3";
+ String delta2 = "/warehouse/foo/delta_4_7";
+
+ HiveConf conf = new HiveConf();
+ Path file = new Path(System.getProperty("java.io.tmpdir") +
+ System.getProperty("file.separator") + "newWriteInputSplitTest");
+ FileSystem fs = FileSystem.get(conf);
+ FSDataOutputStream os = fs.create(file);
+ for (int i = 0; i < 10; i++) {
+ os.writeBytes("mary had a little lamb its fleece was white as snow\n");
+ }
+ os.close();
+ List files = new ArrayList(1);
+ files.add(file);
+
+ Path[] deltas = new Path[2];
+ deltas[0] = new Path(delta1);
+ deltas[1] = new Path(delta2);
+
+ CompactorMR.CompactorInputSplit split =
+ new CompactorMR.CompactorInputSplit(conf, 3, files, null, deltas);
+
+ ByteArrayOutputStream buf = new ByteArrayOutputStream();
+ DataOutput out = new DataOutputStream(buf);
+ split.write(out);
+
+ split = new CompactorMR.CompactorInputSplit();
+ DataInput in = new DataInputStream(new ByteArrayInputStream(buf.toByteArray()));
+ split.readFields(in);
+
+ Assert.assertEquals(3, split.getBucket());
+ Assert.assertNull(split.getBaseDir());
+ deltas = split.getDeltaDirs();
+ Assert.assertEquals(2, deltas.length);
+ Assert.assertEquals(delta1, deltas[0].toString());
+ Assert.assertEquals(delta2, deltas[1].toString());
+ }
+
+ @Test
+ public void sortedTable() throws Exception {
+ List sortCols = new ArrayList(1);
+ sortCols.add(new Order("b", 1));
+
+ Table t = newTable("default", "st", false, new HashMap(), sortCols);
+
+ HiveConf conf = new HiveConf();
+
+ addBaseFile(conf, t, null, 20L, 20);
+ addDeltaFile(conf, t, null, 21L, 22L, 2);
+ addDeltaFile(conf, t, null, 23L, 24L, 2);
+ addDeltaFile(conf, t, null, 21L, 24L, 4);
+
+ burnThroughTransactions(25);
+
+ CompactionRequest rqst = new CompactionRequest("default", "st", CompactionType.MINOR);
+ txnHandler.compact(rqst);
+
+ startWorker(new HiveConf());
+ Thread.sleep(sleepTime); // should be long enough to get through the loop
+ stopThreads();
+
+ // There should still be four directories in the location.
+ FileSystem fs = FileSystem.get(conf);
+ FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
+ Assert.assertEquals(4, stat.length);
+ }
+
+ @Test
+ public void sortedPartition() throws Exception {
+ List sortCols = new ArrayList(1);
+ sortCols.add(new Order("b", 1));
+
+ Table t = newTable("default", "sp", true, new HashMap(), sortCols);
+ Partition p = newPartition(t, "today", sortCols);
+ HiveConf conf = new HiveConf();
+
+ addBaseFile(conf, t, p, 20L, 20);
+ addDeltaFile(conf, t, p, 21L, 22L, 2);
+ addDeltaFile(conf, t, p, 23L, 24L, 2);
+ addDeltaFile(conf, t, p, 21L, 24L, 4);
+
+ burnThroughTransactions(25);
+
+ CompactionRequest rqst = new CompactionRequest("default", "sp", CompactionType.MINOR);
+ rqst.setPartitionname("ds=today");
+ txnHandler.compact(rqst);
+
+ startWorker(new HiveConf());
+ Thread.sleep(sleepTime); // should be long enough to get through the loop
+ stopThreads();
+
+ // There should still be four directories in the location.
+ FileSystem fs = FileSystem.get(conf);
+ FileStatus[] stat = fs.listStatus(new Path(p.getSd().getLocation()));
+ Assert.assertEquals(4, stat.length);
+ }
+
+ @Test
+ public void minorTableWithBase() throws Exception {
+ LOG.debug("Starting minorTableWithBase");
+ Table t = newTable("default", "mtwb", false);
+
+ HiveConf conf = new HiveConf();
+
+ addBaseFile(conf, t, null, 20L, 20);
+ addDeltaFile(conf, t, null, 21L, 22L, 2);
+ addDeltaFile(conf, t, null, 23L, 24L, 2);
+
+ burnThroughTransactions(25);
+
+ CompactionRequest rqst = new CompactionRequest("default", "mtwb", CompactionType.MINOR);
+ txnHandler.compact(rqst);
+
+ startWorker(conf);
+ Thread.sleep(sleepTime); // should be long enough to get through the loop
+ joinThreads();
+
+ // There should still now be 5 directories in the location
+ FileSystem fs = FileSystem.get(conf);
+ FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
+ Assert.assertEquals(4, stat.length);
+
+ // Find the new delta file and make sure it has the right contents
+ boolean sawNewDelta = false;
+ for (int i = 0; i < stat.length; i++) {
+ if (stat[i].getPath().getName().equals("delta_0000021_0000024")) {
+ sawNewDelta = true;
+ FileStatus[] buckets = fs.listStatus(stat[i].getPath());
+ Assert.assertEquals(1, buckets.length);
+ Assert.assertEquals("bucket_00000", buckets[0].getPath().getName());
+ Assert.assertEquals(208L, buckets[0].getLen());
+ } else {
+ LOG.debug("This is not the delta file you are looking for " + stat[i].getPath().getName());
+ }
+ }
+ Assert.assertTrue(sawNewDelta);
+ }
+
+ @Test
+ public void minorPartitionWithBase() throws Exception {
+ Table t = newTable("default", "mpwb", true);
+ Partition p = newPartition(t, "today");
+ HiveConf conf = new HiveConf();
+
+ addBaseFile(conf, t, p, 20L, 20);
+ addDeltaFile(conf, t, p, 21L, 22L, 2);
+ addDeltaFile(conf, t, p, 23L, 24L, 2);
+
+ burnThroughTransactions(25);
+
+ CompactionRequest rqst = new CompactionRequest("default", "mpwb", CompactionType.MINOR);
+ rqst.setPartitionname("ds=today");
+ txnHandler.compact(rqst);
+
+ startWorker(new HiveConf());
+ Thread.sleep(sleepTime); // should be long enough to get through the loop
+ joinThreads();
+
+ // There should still be four directories in the location.
+ FileSystem fs = FileSystem.get(conf);
+ FileStatus[] stat = fs.listStatus(new Path(p.getSd().getLocation()));
+ Assert.assertEquals(4, stat.length);
+
+ // Find the new delta file and make sure it has the right contents
+ boolean sawNewDelta = false;
+ for (int i = 0; i < stat.length; i++) {
+ if (stat[i].getPath().getName().equals("delta_0000021_0000024")) {
+ sawNewDelta = true;
+ FileStatus[] buckets = fs.listStatus(stat[i].getPath());
+ Assert.assertEquals(1, buckets.length);
+ Assert.assertEquals("bucket_00000", buckets[0].getPath().getName());
+ Assert.assertEquals(208L, buckets[0].getLen());
+ } else {
+ LOG.debug("This is not the delta file you are looking for " + stat[i].getPath().getName());
+ }
+ }
+ Assert.assertTrue(sawNewDelta);
+ }
+
+ @Test
+ public void minorTableNoBase() throws Exception {
+ LOG.debug("Starting minorTableWithBase");
+ Table t = newTable("default", "mtnb", false);
+
+ HiveConf conf = new HiveConf();
+
+ addDeltaFile(conf, t, null, 1L, 2L, 2);
+ addDeltaFile(conf, t, null, 3L, 4L, 2);
+
+ burnThroughTransactions(5);
+
+ CompactionRequest rqst = new CompactionRequest("default", "mtnb", CompactionType.MINOR);
+ txnHandler.compact(rqst);
+
+ startWorker(new HiveConf());
+ Thread.sleep(sleepTime); // should be long enough to get through the loop
+ joinThreads();
+
+ // There should still now be 5 directories in the location
+ FileSystem fs = FileSystem.get(conf);
+ FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
+ Assert.assertEquals(3, stat.length);
+
+ // Find the new delta file and make sure it has the right contents
+ boolean sawNewDelta = false;
+ for (int i = 0; i < stat.length; i++) {
+ if (stat[i].getPath().getName().equals("delta_0000001_0000004")) {
+ sawNewDelta = true;
+ FileStatus[] buckets = fs.listStatus(stat[i].getPath());
+ Assert.assertEquals(1, buckets.length);
+ Assert.assertEquals("bucket_00000", buckets[0].getPath().getName());
+ Assert.assertEquals(208L, buckets[0].getLen());
+ } else {
+ LOG.debug("This is not the delta file you are looking for " + stat[i].getPath().getName());
+ }
+ }
+ Assert.assertTrue(sawNewDelta);
+ }
+
+ @Test
+ public void majorTableWithBase() throws Exception {
+ LOG.debug("Starting majorTableWithBase");
+ Table t = newTable("default", "matwb", false);
+
+ HiveConf conf = new HiveConf();
+
+ addBaseFile(conf, t, null, 20L, 20);
+ addDeltaFile(conf, t, null, 21L, 22L, 2);
+ addDeltaFile(conf, t, null, 23L, 24L, 2);
+
+ burnThroughTransactions(25);
+
+ CompactionRequest rqst = new CompactionRequest("default", "matwb", CompactionType.MAJOR);
+ txnHandler.compact(rqst);
+
+ startWorker(new HiveConf());
+ Thread.sleep(sleepTime); // should be long enough to get through the loop
+ joinThreads();
+
+ // There should still now be 5 directories in the location
+ FileSystem fs = FileSystem.get(conf);
+ FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
+ Assert.assertEquals(4, stat.length);
+
+ // Find the new delta file and make sure it has the right contents
+ boolean sawNewBase = false;
+ for (int i = 0; i < stat.length; i++) {
+ if (stat[i].getPath().getName().equals("base_0000024")) {
+ sawNewBase = true;
+ FileStatus[] buckets = fs.listStatus(stat[i].getPath());
+ Assert.assertEquals(1, buckets.length);
+ Assert.assertEquals("bucket_00000", buckets[0].getPath().getName());
+ Assert.assertEquals(1248L, buckets[0].getLen());
+ } else {
+ LOG.debug("This is not the file you are looking for " + stat[i].getPath().getName());
+ }
+ }
+ Assert.assertTrue(sawNewBase);
+ }
+
+ @Test
+ public void majorPartitionWithBase() throws Exception {
+ Table t = newTable("default", "mapwb", true);
+ Partition p = newPartition(t, "today");
+ HiveConf conf = new HiveConf();
+
+ addBaseFile(conf, t, p, 20L, 20);
+ addDeltaFile(conf, t, p, 21L, 22L, 2);
+ addDeltaFile(conf, t, p, 23L, 24L, 2);
+
+ burnThroughTransactions(25);
+
+ CompactionRequest rqst = new CompactionRequest("default", "mapwb", CompactionType.MAJOR);
+ rqst.setPartitionname("ds=today");
+ txnHandler.compact(rqst);
+
+ startWorker(new HiveConf());
+ Thread.sleep(sleepTime); // should be long enough to get through the loop
+ joinThreads();
+
+ // There should still be four directories in the location.
+ FileSystem fs = FileSystem.get(conf);
+ FileStatus[] stat = fs.listStatus(new Path(p.getSd().getLocation()));
+ Assert.assertEquals(4, stat.length);
+
+ // Find the new delta file and make sure it has the right contents
+ boolean sawNewBase = false;
+ for (int i = 0; i < stat.length; i++) {
+ if (stat[i].getPath().getName().equals("base_0000024")) {
+ sawNewBase = true;
+ FileStatus[] buckets = fs.listStatus(stat[i].getPath());
+ Assert.assertEquals(1, buckets.length);
+ Assert.assertEquals("bucket_00000", buckets[0].getPath().getName());
+ Assert.assertEquals(1248L, buckets[0].getLen());
+ } else {
+ LOG.debug("This is not the file you are looking for " + stat[i].getPath().getName());
+ }
+ }
+ Assert.assertTrue(sawNewBase);
+ }
+
+ @Test
+ public void majorTableNoBase() throws Exception {
+ LOG.debug("Starting majorTableNoBase");
+ Table t = newTable("default", "matnb", false);
+
+ HiveConf conf = new HiveConf();
+
+ addDeltaFile(conf, t, null, 1L, 2L, 2);
+ addDeltaFile(conf, t, null, 3L, 4L, 2);
+
+ burnThroughTransactions(5);
+
+ CompactionRequest rqst = new CompactionRequest("default", "matnb", CompactionType.MAJOR);
+ txnHandler.compact(rqst);
+
+ startWorker(new HiveConf());
+ Thread.sleep(sleepTime); // should be long enough to get through the loop
+ joinThreads();
+
+ // There should still now be 5 directories in the location
+ FileSystem fs = FileSystem.get(conf);
+ FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
+ Assert.assertEquals(3, stat.length);
+
+ // Find the new delta file and make sure it has the right contents
+ boolean sawNewBase = false;
+ for (int i = 0; i < stat.length; i++) {
+ if (stat[i].getPath().getName().equals("base_0000004")) {
+ sawNewBase = true;
+ FileStatus[] buckets = fs.listStatus(stat[i].getPath());
+ Assert.assertEquals(1, buckets.length);
+ Assert.assertEquals("bucket_00000", buckets[0].getPath().getName());
+ Assert.assertEquals(208L, buckets[0].getLen());
+ } else {
+ LOG.debug("This is not the file you are looking for " + stat[i].getPath().getName());
+ }
+ }
+ Assert.assertTrue(sawNewBase);
+ }
+
+ @Test
+ public void majorTableLegacy() throws Exception {
+ LOG.debug("Starting majorTableLegacy");
+ Table t = newTable("default", "matl", false);
+
+ HiveConf conf = new HiveConf();
+
+ addLegacyFile(conf, t, null, 20);
+ addDeltaFile(conf, t, null, 21L, 22L, 2);
+ addDeltaFile(conf, t, null, 23L, 24L, 2);
+
+ burnThroughTransactions(25);
+
+ CompactionRequest rqst = new CompactionRequest("default", "matl", CompactionType.MAJOR);
+ txnHandler.compact(rqst);
+
+ startWorker(new HiveConf());
+ Thread.sleep(sleepTime); // should be long enough to get through the loop
+ joinThreads();
+
+ // There should still now be 5 directories in the location
+ FileSystem fs = FileSystem.get(conf);
+ FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
+ //Assert.assertEquals(4, stat.length);
+
+ // Find the new delta file and make sure it has the right contents
+ boolean sawNewBase = false;
+ for (int i = 0; i < stat.length; i++) {
+ if (stat[i].getPath().getName().equals("base_0000024")) {
+ sawNewBase = true;
+ FileStatus[] buckets = fs.listStatus(stat[i].getPath());
+ Assert.assertEquals(1, buckets.length);
+ Assert.assertEquals("bucket_00000", buckets[0].getPath().getName());
+ Assert.assertEquals(1248L, buckets[0].getLen());
+ } else {
+ LOG.debug("This is not the file you are looking for " + stat[i].getPath().getName());
+ }
+ }
+ Assert.assertTrue(sawNewBase);
+ }
+
+ @Test
+ public void minorTableLegacy() throws Exception {
+ LOG.debug("Starting minorTableLegacy");
+ Table t = newTable("default", "mtl", false);
+
+ HiveConf conf = new HiveConf();
+
+ addLegacyFile(conf, t, null, 20);
+ addDeltaFile(conf, t, null, 21L, 22L, 2);
+ addDeltaFile(conf, t, null, 23L, 24L, 2);
+
+ burnThroughTransactions(25);
+
+ CompactionRequest rqst = new CompactionRequest("default", "mtl", CompactionType.MINOR);
+ txnHandler.compact(rqst);
+
+ startWorker(new HiveConf());
+ Thread.sleep(sleepTime); // should be long enough to get through the loop
+ joinThreads();
+
+ // There should still now be 5 directories in the location
+ FileSystem fs = FileSystem.get(conf);
+ FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
+ //Assert.assertEquals(4, stat.length);
+
+ // Find the new delta file and make sure it has the right contents
+ boolean sawNewDelta = false;
+ for (int i = 0; i < stat.length; i++) {
+ if (stat[i].getPath().getName().equals("delta_0000021_0000024")) {
+ sawNewDelta = true;
+ FileStatus[] buckets = fs.listStatus(stat[i].getPath());
+ Assert.assertEquals(1, buckets.length);
+ Assert.assertEquals("bucket_00000", buckets[0].getPath().getName());
+ Assert.assertEquals(208L, buckets[0].getLen());
+ } else {
+ LOG.debug("This is not the file you are looking for " + stat[i].getPath().getName());
+ }
+ }
+ Assert.assertTrue(sawNewDelta);
+ }
+
+ @Before
+ public void setUpTxnDb() throws Exception {
+ TxnDbUtil.setConfValues(new HiveConf());
+ TxnDbUtil.prepDb();
+ }
+
+ @After
+ public void tearDownTxnDb() throws Exception {
+ TxnDbUtil.cleanDb();
+ }
+}