diff --git metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 18e62d8..c62e085 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -42,6 +42,10 @@ import java.util.Properties; import java.util.Set; import java.util.Timer; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Pattern; import org.apache.commons.cli.OptionBuilder; @@ -5074,7 +5078,12 @@ public void run() { } }); - startMetaStore(cli.port, ShimLoader.getHadoopThriftAuthBridge(), conf); + Lock startLock = new ReentrantLock(); + Condition startCondition = startLock.newCondition(); + MetaStoreThread.BooleanPointer startedServing = new MetaStoreThread.BooleanPointer(); + startMetaStoreThreads(conf, startLock, startCondition, startedServing); + startMetaStore(cli.port, ShimLoader.getHadoopThriftAuthBridge(), conf, startLock, + startCondition, startedServing); } catch (Throwable t) { // Catch the exception, log it and rethrow it. HMSHandler.LOG @@ -5092,7 +5101,19 @@ public void run() { */ public static void startMetaStore(int port, HadoopThriftAuthBridge bridge) throws Throwable { - startMetaStore(port, bridge, new HiveConf(HMSHandler.class)); + startMetaStore(port, bridge, new HiveConf(HMSHandler.class), null, null, null); + } + + /** + * Start the metastore store. + * @param port + * @param bridge + * @param conf + * @throws Throwable + */ + public static void startMetaStore(int port, HadoopThriftAuthBridge bridge, + HiveConf conf) throws Throwable { + startMetaStore(port, bridge, conf, null, null, null); } /** @@ -5105,7 +5126,8 @@ public static void startMetaStore(int port, HadoopThriftAuthBridge bridge) * @throws Throwable */ public static void startMetaStore(int port, HadoopThriftAuthBridge bridge, - HiveConf conf) throws Throwable { + HiveConf conf, Lock startLock, Condition startCondition, + MetaStoreThread.BooleanPointer startedServing) throws Throwable { try { // Server will create new threads up to max as necessary. After an idle @@ -5173,6 +5195,10 @@ public static void startMetaStore(int port, HadoopThriftAuthBridge bridge, HMSHandler.LOG.info("Options.maxWorkerThreads = " + maxWorkerThreads); HMSHandler.LOG.info("TCP keepalive = " + tcpKeepAlive); + + if (startLock != null) { + signalOtherThreadsToStart(tServer, startLock, startCondition, startedServing); + } tServer.serve(); } catch (Throwable x) { x.printStackTrace(); @@ -5180,4 +5206,119 @@ public static void startMetaStore(int port, HadoopThriftAuthBridge bridge, throw x; } } + + private static void signalOtherThreadsToStart(final TServer server, final Lock startLock, + final Condition startCondition, + final MetaStoreThread.BooleanPointer startedServing) { + // A simple thread to wait until the server has started and then signal the other threads to + // begin + Thread t = new Thread() { + @Override + public void run() { + do { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + LOG.warn("Signalling thread was interuppted: " + e.getMessage()); + } + } while (!server.isServing()); + startLock.lock(); + try { + startedServing.boolVal = true; + startCondition.signalAll(); + } finally { + startLock.unlock(); + } + } + }; + t.start(); + } + + /** + * Start threads outside of the thrift service, such as the compactor threads. + * @param conf Hive configuration object + */ + private static void startMetaStoreThreads(final HiveConf conf, final Lock startLock, + final Condition startCondition, final + MetaStoreThread.BooleanPointer startedServing) { + // A thread is spun up to start these other threads. That's because we can't start them + // until after the TServer has started, but once TServer.serve is called we aren't given back + // control. + Thread t = new Thread() { + @Override + public void run() { + // This is a massive hack. The compactor threads have to access packages in ql (such as + // AcidInputFormat). ql depends on metastore so we can't directly access those. To deal + // with this the compactor thread classes have been put in ql and they are instantiated here + // dyanmically. This is not ideal but it avoids a massive refactoring of Hive packages. + // + // Wrap the start of the threads in a catch Throwable loop so that any failures + // don't doom the rest of the metastore. + startLock.lock(); + try { + // Per the javadocs on Condition, do not depend on the condition alone as a start gate + // since spurious wake ups are possible. + while (!startedServing.boolVal) startCondition.await(); + startCompactorInitiator(conf); + startCompactorWorkers(conf); + startCompactorCleaner(conf); + } catch (Throwable e) { + LOG.error("Failure when starting the compactor, compactions may not happen, " + + StringUtils.stringifyException(e)); + } finally { + startLock.unlock(); + } + } + }; + + t.start(); + } + + private static void startCompactorInitiator(HiveConf conf) throws Exception { + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_INITIATOR_ON)) { + MetaStoreThread initiator = + instantiateThread("org.apache.hadoop.hive.ql.txn.compactor.Initiator"); + initializeAndStartThread(initiator, conf); + } + } + + private static void startCompactorWorkers(HiveConf conf) throws Exception { + int numWorkers = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_THREADS); + for (int i = 0; i < numWorkers; i++) { + MetaStoreThread worker = + instantiateThread("org.apache.hadoop.hive.ql.txn.compactor.Worker"); + initializeAndStartThread(worker, conf); + } + } + + private static void startCompactorCleaner(HiveConf conf) throws Exception { + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_INITIATOR_ON)) { + MetaStoreThread cleaner = + instantiateThread("org.apache.hadoop.hive.ql.txn.compactor.Cleaner"); + initializeAndStartThread(cleaner, conf); + } + } + + private static MetaStoreThread instantiateThread(String classname) throws Exception { + Class c = Class.forName(classname); + Object o = c.newInstance(); + if (MetaStoreThread.class.isAssignableFrom(o.getClass())) { + return (MetaStoreThread)o; + } else { + String s = classname + " is not an instance of MetaStoreThread."; + LOG.error(s); + throw new IOException(s); + } + } + + private static int nextThreadId = 1000000; + + private static void initializeAndStartThread(MetaStoreThread thread, HiveConf conf) throws + MetaException { + LOG.info("Starting metastore thread of type " + thread.getClass().getName()); + thread.setHiveConf(conf); + thread.setThreadId(nextThreadId++); + thread.init(new MetaStoreThread.BooleanPointer()); + thread.start(); + } } diff --git metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java new file mode 100644 index 0000000..6e18a5b --- /dev/null +++ metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.MetaException; + +/** + * A thread that runs in the metastore, separate from the threads in the thrift service. + */ +public interface MetaStoreThread { + + /** + * Set the Hive configuration for this thread. + * @param conf + */ + void setHiveConf(HiveConf conf); + + /** + * Set the id for this thread. + * @param threadId + */ + void setThreadId(int threadId); + + /** + * Initialize the thread. This must not be called until after + * {@link #setHiveConf(org.apache.hadoop.hive.conf.HiveConf)} and {@link #setThreadId(int)} + * have been called. + * @param stop a flag to watch for when to stop. If this value is set to true, + * the thread will terminate the next time through its main loop. + */ + void init(BooleanPointer stop) throws MetaException; + + /** + * Run the thread in the background. This must not be called until + * {@link #init(org.apache.hadoop.hive.metastore.MetaStoreThread.BooleanPointer)} has + * been called. + */ + void start(); + + class BooleanPointer { + public boolean boolVal; + + public BooleanPointer() { + boolVal = false; + } + } + +} diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index 838015a..1ea3541 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -51,13 +51,14 @@ public CompactionTxnHandler(HiveConf conf) { * @return list of CompactionInfo structs. These will not have id, type, * or runAs set since these are only potential compactions not actual ones. */ - public List findPotentialCompactions(int maxAborted) throws MetaException { + public Set findPotentialCompactions(int maxAborted) throws MetaException { Connection dbConn = getDbConn(); - List response = new ArrayList(); + Set response = new HashSet(); try { Statement stmt = dbConn.createStatement(); // Check for completed transactions - String s = "select ctc_database, ctc_table, ctc_partition from COMPLETED_TXN_COMPONENTS"; + String s = "select distinct ctc_database, ctc_table, " + + "ctc_partition from COMPLETED_TXN_COMPONENTS"; LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); while (rs.next()) { diff --git metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java index a15a210..5d6f066 100644 --- metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java +++ metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java @@ -30,6 +30,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Set; import static junit.framework.Assert.*; @@ -317,7 +318,7 @@ public void testFindPotentialCompactions() throws Exception { txnHandler.commitTxn(new CommitTxnRequest(txnid)); assertEquals(0, txnHandler.numLocksInLockTable()); - List potentials = txnHandler.findPotentialCompactions(100); + Set potentials = txnHandler.findPotentialCompactions(100); assertEquals(2, potentials.size()); boolean sawMyTable = false, sawYourTable = false; for (CompactionInfo ci : potentials) { diff --git ql/pom.xml ql/pom.xml index 954d7e6..5e1151c 100644 --- ql/pom.xml +++ ql/pom.xml @@ -352,6 +352,13 @@ org.apache.hadoop + hadoop-mapreduce-client-common + ${hadoop-23.version} + true + test + + + org.apache.hadoop hadoop-hdfs ${hadoop-23.version} true diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index f4f0631..d8dc470 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -51,18 +51,29 @@ private AcidUtils() { public static final String DELTA_PREFIX = "delta_"; public static final String BUCKET_PREFIX = "bucket_"; - private static final String BUCKET_DIGITS = "%05d"; - private static final String DELTA_DIGITS = "%07d"; + public static final String BUCKET_DIGITS = "%05d"; + public static final String DELTA_DIGITS = "%07d"; private static final Pattern ORIGINAL_PATTERN = Pattern.compile("[0-9]+_[0-9]+"); + public static final Pattern BUCKET_DIGIT_PATTERN = Pattern.compile("[0-9]{5}$"); + public static final Pattern LEGACY_BUCKET_DIGIT_PATTERN = Pattern.compile("^[0-9]{5}"); + public static final PathFilter hiddenFileFilter = new PathFilter(){ public boolean accept(Path p){ String name = p.getName(); return !name.startsWith("_") && !name.startsWith("."); } }; + + public static final PathFilter bucketFileFilter = new PathFilter() { + @Override + public boolean accept(Path path) { + return path.getName().startsWith(BUCKET_PREFIX); + } + }; + private static final HadoopShims SHIMS = ShimLoader.getHadoopShims(); /** diff --git ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java index 89b9b68..38a0d6b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java @@ -118,12 +118,16 @@ public int compareTo(RecordIdentifier other) { @Override public void write(DataOutput dataOutput) throws IOException { - throw new UnsupportedOperationException("Can't write RecordIdentifier"); + dataOutput.writeLong(transactionId); + dataOutput.writeInt(bucketId); + dataOutput.writeLong(rowId); } @Override public void readFields(DataInput dataInput) throws IOException { - throw new UnsupportedOperationException("Can't read RecordIdentifier"); + transactionId = dataInput.readLong(); + bucketId = dataInput.readInt(); + rowId = dataInput.readLong(); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java new file mode 100644 index 0000000..18bb2c0 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.common.ValidTxnListImpl; +import org.apache.hadoop.hive.metastore.api.LockComponent; +import org.apache.hadoop.hive.metastore.api.LockLevel; +import org.apache.hadoop.hive.metastore.api.LockRequest; +import org.apache.hadoop.hive.metastore.api.LockResponse; +import org.apache.hadoop.hive.metastore.api.LockState; +import org.apache.hadoop.hive.metastore.api.LockType; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.UnlockRequest; +import org.apache.hadoop.hive.metastore.txn.CompactionInfo; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.StringUtils; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.List; + +/** + * A class to clean directories after compactions. This will run in a separate thread. + */ +public class Cleaner extends CompactorThread { + static final private String CLASS_NAME = Cleaner.class.getName(); + static final private Log LOG = LogFactory.getLog(CLASS_NAME); + + private long cleanerCheckInterval = 5000; + + @Override + public void run() { + // Make sure nothing escapes this run method and kills the metastore at large, + // so wrap it in a big catch Throwable statement. + do { + try { + long startedAt = System.currentTimeMillis(); + + // Now look for new entries ready to be cleaned. + List toClean = txnHandler.findReadyToClean(); + for (CompactionInfo ci : toClean) { + LockComponent comp = null; + comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.TABLE, ci.dbname); + comp.setTablename(ci.tableName); + if (ci.partName != null) comp.setPartitionname(ci.partName); + List components = new ArrayList(1); + components.add(comp); + LockRequest rqst = new LockRequest(components, System.getProperty("user.name"), + Worker.hostname()); + LockResponse rsp = txnHandler.lockNoWait(rqst); + try { + if (rsp.getState() == LockState.ACQUIRED) { + clean(ci); + } + } finally { + if (rsp.getState() == LockState.ACQUIRED) { + txnHandler.unlock(new UnlockRequest(rsp.getLockid())); + } + } + } + + // Now, go back to bed until it's time to do this again + long elapsedTime = System.currentTimeMillis() - startedAt; + if (elapsedTime >= cleanerCheckInterval || stop.boolVal) continue; + else Thread.sleep(cleanerCheckInterval - elapsedTime); + } catch (Throwable t) { + LOG.error("Caught an exception in the main loop of compactor cleaner, " + + StringUtils.stringifyException(t)); + } + } while (!stop.boolVal); + } + + private void clean(CompactionInfo ci) throws MetaException { + LOG.info("Starting cleaning for " + ci.getFullPartitionName()); + try { + StorageDescriptor sd = resolveStorageDescriptor(resolveTable(ci), resolvePartition(ci)); + final String location = sd.getLocation(); + + // Create a bogus validTxnList with a high water mark set to MAX_LONG and no open + // transactions. This assures that all deltas are treated as valid and all we return are + // obsolete files. + final ValidTxnList txnList = new ValidTxnListImpl(); + + if (runJobAsSelf(ci.runAs)) { + removeFiles(location, txnList); + } else { + LOG.info("Cleaning as user " + ci.runAs); + UserGroupInformation ugi = UserGroupInformation.createProxyUser(ci.runAs, + UserGroupInformation.getLoginUser()); + ugi.doAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws Exception { + removeFiles(location, txnList); + return null; + } + }); + } + + } catch (Exception e) { + LOG.error("Caught exception when cleaning, unable to complete cleaning " + + StringUtils.stringifyException(e)); + } finally { + // We need to clean this out one way or another. + txnHandler.markCleaned(ci); + } + } + + private void removeFiles(String location, ValidTxnList txnList) throws IOException { + AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(location), conf, txnList); + List obsoleteDirs = dir.getObsolete(); + List filesToDelete = new ArrayList(obsoleteDirs.size()); + for (FileStatus stat : obsoleteDirs) { + filesToDelete.add(stat.getPath()); + } + if (filesToDelete.size() < 1) { + LOG.warn("Hmm, nothing to delete in the cleaner for directory " + location + + ", that hardly seems right."); + return; + } + FileSystem fs = filesToDelete.get(0).getFileSystem(conf); + + for (Path dead : filesToDelete) { + LOG.debug("Doing to delete path " + dead.toString()); + fs.delete(dead, true); + } + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java new file mode 100644 index 0000000..6366357 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -0,0 +1,719 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.common.ValidTxnListImpl; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.io.AcidInputFormat; +import org.apache.hadoop.hive.ql.io.AcidOutputFormat; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.FSRecordWriter; +import org.apache.hadoop.hive.ql.io.RecordIdentifier; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; + +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobContext; +import org.apache.hadoop.mapred.JobStatus; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.OutputCommitter; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.RunningJob; +import org.apache.hadoop.mapred.TaskAttemptContext; +import org.apache.hadoop.mapred.lib.NullOutputFormat; +import org.apache.hadoop.util.StringUtils; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.*; +import java.util.regex.Matcher; + +/** + * Class to do compactions via an MR job. This has to be in the ql package rather than metastore + * .compactions package with all of it's relatives because it needs access to the actual input + * and output formats, which are in ql. ql depends on metastore and we can't have a circular + * dependency. + */ +public class CompactorMR { + + static final private String CLASS_NAME = CompactorMR.class.getName(); + static final private Log LOG = LogFactory.getLog(CLASS_NAME); + + static final private String INPUT_FORMAT_CLASS_NAME = "hive.compactor.input.format.class.name"; + static final private String OUTPUT_FORMAT_CLASS_NAME = "hive.compactor.output.format.class.name"; + static final private String TMP_LOCATION = "hive.compactor.input.tmp.dir"; + static final private String FINAL_LOCATION = "hive.compactor.input.dir"; + static final private String MIN_TXN = "hive.compactor.txn.min"; + static final private String MAX_TXN = "hive.compactor.txn.max"; + static final private String IS_MAJOR = "hive.compactor.is.major"; + static final private String IS_COMPRESSED = "hive.compactor.is.compressed"; + static final private String TABLE_PROPS = "hive.compactor.table.props"; + static final private String NUM_BUCKETS = "hive.compactor.num.buckets"; + static final private String BASE_DIR = "hive.compactor.base.dir"; + static final private String DELTA_DIRS = "hive.compactor.delta.dirs"; + static final private String DIRS_TO_SEARCH = "hive.compactor.dirs.to.search"; + static final private String TMPDIR = "_tmp"; + + public CompactorMR() { + } + + /** + * Run a compactor job. + * @param conf Hive configuration file + * @param jobName name to run this job with + * @param t metastore table + * @param sd metastore storage descriptor + * @param txns list of valid transactions + * @param isMajor is this a major compaction? + * @throws java.io.IOException if the job fails + */ + void run(HiveConf conf, String jobName, Table t, StorageDescriptor sd, + ValidTxnList txns, boolean isMajor) throws IOException { + JobConf job = new JobConf(conf); + job.setJobName(jobName); + job.setOutputKeyClass(NullWritable.class); + job.setOutputValueClass(NullWritable.class); + job.setJarByClass(CompactorMR.class); + LOG.debug("User jar set to " + job.getJar()); + job.setMapperClass(CompactorMap.class); + job.setNumReduceTasks(0); + job.setInputFormat(CompactorInputFormat.class); + job.setOutputFormat(NullOutputFormat.class); + job.setOutputCommitter(CompactorOutputCommitter.class); + + job.set(FINAL_LOCATION, sd.getLocation()); + job.set(TMP_LOCATION, sd.getLocation() + "/" + TMPDIR); + job.set(INPUT_FORMAT_CLASS_NAME, sd.getInputFormat()); + job.set(OUTPUT_FORMAT_CLASS_NAME, sd.getOutputFormat()); + job.setBoolean(IS_MAJOR, isMajor); + job.setBoolean(IS_COMPRESSED, sd.isCompressed()); + job.set(TABLE_PROPS, new StringableMap(t.getParameters()).toString()); + job.setInt(NUM_BUCKETS, sd.getBucketColsSize()); + job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString()); + setColumnTypes(job, sd.getCols()); + + // Figure out and encode what files we need to read. We do this here (rather than in + // getSplits below) because as part of this we discover our minimum and maximum transactions, + // and discovering that in getSplits is too late as we then have no way to pass it to our + // mapper. + + AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(sd.getLocation()), conf, txns); + StringableList dirsToSearch = new StringableList(); + Path baseDir = null; + if (isMajor) { + // There may not be a base dir if the partition was empty before inserts or if this + // partition is just now being converted to ACID. + baseDir = dir.getBaseDirectory(); + if (baseDir == null) { + List originalFiles = dir.getOriginalFiles(); + if (!(originalFiles == null) && !(originalFiles.size() == 0)) { + // There are original format files + for (FileStatus stat : originalFiles) { + dirsToSearch.add(stat.getPath()); + LOG.debug("Adding original file " + stat.getPath().toString() + " to dirs to search"); + } + // Set base to the location so that the input format reads the original files. + baseDir = new Path(sd.getLocation()); + } + } else { + // add our base to the list of directories to search for files in. + LOG.debug("Adding base directory " + baseDir + " to dirs to search"); + dirsToSearch.add(baseDir); + } + } + + List parsedDeltas = dir.getCurrentDirectories(); + + if (parsedDeltas == null || parsedDeltas.size() == 0) { + // Seriously, no deltas? Can't compact that. + LOG.error("No delta files found to compact in " + sd.getLocation()); + return; + } + + StringableList deltaDirs = new StringableList(); + long minTxn = Long.MAX_VALUE; + long maxTxn = Long.MIN_VALUE; + for (AcidUtils.ParsedDelta delta : parsedDeltas) { + LOG.debug("Adding delta " + delta.getPath() + " to directories to search"); + dirsToSearch.add(delta.getPath()); + deltaDirs.add(delta.getPath()); + minTxn = Math.min(minTxn, delta.getMinTransaction()); + maxTxn = Math.max(maxTxn, delta.getMaxTransaction()); + } + + if (baseDir != null) job.set(BASE_DIR, baseDir.toString()); + job.set(DELTA_DIRS, deltaDirs.toString()); + job.set(DIRS_TO_SEARCH, dirsToSearch.toString()); + job.setLong(MIN_TXN, minTxn); + job.setLong(MAX_TXN, maxTxn); + LOG.debug("Setting minimum transaction to " + minTxn); + LOG.debug("Setting maximume transaction to " + maxTxn); + + JobClient.runJob(job).waitForCompletion(); + } + + /** + * Set the column names and types into the job conf for the input format + * to use. + * @param job the job to update + * @param cols the columns of the table + */ + private void setColumnTypes(JobConf job, List cols) { + StringBuilder colNames = new StringBuilder(); + StringBuilder colTypes = new StringBuilder(); + boolean isFirst = true; + for(FieldSchema col: cols) { + if (isFirst) { + isFirst = false; + } else { + colNames.append(','); + colTypes.append(','); + } + colNames.append(col.getName()); + colTypes.append(col.getType()); + } + job.set(serdeConstants.LIST_COLUMNS, colNames.toString()); + job.set(serdeConstants.LIST_COLUMN_TYPES, colTypes.toString()); + } + + static class CompactorInputSplit implements InputSplit { + private long length = 0; + private List locations; + private int bucketNum; + private Path base; + private Path[] deltas; + + public CompactorInputSplit() { + } + + /** + * + * @param hadoopConf + * @param bucket bucket to be processed by this split + * @param files actual files this split should process. It is assumed the caller has already + * parsed out the files in base and deltas to populate this list. + * @param base directory of the base, or the partition/table location if the files are in old + * style. Can be null. + * @param deltas directories of the delta files. + * @throws IOException + */ + CompactorInputSplit(Configuration hadoopConf, int bucket, List files, Path base, + Path[] deltas) + throws IOException { + bucketNum = bucket; + this.base = base; + this.deltas = deltas; + locations = new ArrayList(); + + for (Path path : files) { + FileSystem fs = path.getFileSystem(hadoopConf); + FileStatus stat = fs.getFileStatus(path); + length += stat.getLen(); + BlockLocation[] locs = fs.getFileBlockLocations(stat, 0, length); + for (int i = 0; i < locs.length; i++) { + String[] hosts = locs[i].getHosts(); + for (int j = 0; j < hosts.length; j++) { + locations.add(hosts[j]); + } + } + } + } + + @Override + public long getLength() throws IOException { + return length; + } + + @Override + public String[] getLocations() throws IOException { + return locations.toArray(new String[locations.size()]); + } + + @Override + public void write(DataOutput dataOutput) throws IOException { + dataOutput.writeLong(length); + dataOutput.writeInt(locations.size()); + for (int i = 0; i < locations.size(); i++) { + dataOutput.writeInt(locations.get(i).length()); + dataOutput.writeBytes(locations.get(i)); + } + dataOutput.writeInt(bucketNum); + if (base == null) { + dataOutput.writeInt(0); + } else { + dataOutput.writeInt(base.toString().length()); + dataOutput.writeBytes(base.toString()); + } + dataOutput.writeInt(deltas.length); + for (int i = 0; i < deltas.length; i++) { + dataOutput.writeInt(deltas[i].toString().length()); + dataOutput.writeBytes(deltas[i].toString()); + } + + } + + @Override + public void readFields(DataInput dataInput) throws IOException { + int len; + byte[] buf; + + locations = new ArrayList(); + length = dataInput.readLong(); + LOG.debug("Read length of " + length); + int numElements = dataInput.readInt(); + LOG.debug("Read numElements of " + numElements); + for (int i = 0; i < numElements; i++) { + len = dataInput.readInt(); + LOG.debug("Read file length of " + len); + buf = new byte[len]; + dataInput.readFully(buf); + locations.add(new String(buf)); + } + bucketNum = dataInput.readInt(); + LOG.debug("Read bucket number of " + bucketNum); + len = dataInput.readInt(); + LOG.debug("Read base path length of " + len); + if (len > 0) { + buf = new byte[len]; + dataInput.readFully(buf); + base = new Path(new String(buf)); + } + numElements = dataInput.readInt(); + deltas = new Path[numElements]; + for (int i = 0; i < numElements; i++) { + len = dataInput.readInt(); + buf = new byte[len]; + dataInput.readFully(buf); + deltas[i] = new Path(new String(buf)); + } + } + + public void set(CompactorInputSplit other) { + length = other.length; + locations = other.locations; + bucketNum = other.bucketNum; + base = other.base; + deltas = other.deltas; + } + + int getBucket() { + return bucketNum; + } + + Path getBaseDir() { + return base; + } + + Path[] getDeltaDirs() { + return deltas; + } + + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("CompactorInputSplit{base: "); + builder.append(base); + builder.append(", bucket: "); + builder.append(bucketNum); + builder.append(", length: "); + builder.append(length); + builder.append(", deltas: ["); + for(int i=0; i < deltas.length; ++i) { + if (i != 0) { + builder.append(", "); + } + builder.append(deltas[i].getName()); + } + builder.append("]}"); + return builder.toString(); + } + } + + /** + * This input format returns its own input split as a value. This is because our splits + * contain information needed to properly construct the writer. Crazy, huh? + */ + static class CompactorInputFormat implements InputFormat { + + @Override + public InputSplit[] getSplits(JobConf entries, int i) throws IOException { + Path baseDir = null; + if (entries.get(BASE_DIR) != null) baseDir = new Path(entries.get(BASE_DIR)); + StringableList tmpDeltaDirs = new StringableList(entries.get(DELTA_DIRS)); + Path[] deltaDirs = tmpDeltaDirs.toArray(new Path[tmpDeltaDirs.size()]); + StringableList dirsToSearch = new StringableList(entries.get(DIRS_TO_SEARCH)); + Map splitToBucketMap = new HashMap(); + for (Path dir : dirsToSearch) { + FileSystem fs = dir.getFileSystem(entries); + + // If this is a base or delta directory, then we need to be looking for the bucket files. + // But if it's a legacy file then we need to add it directly. + if (dir.getName().startsWith(AcidUtils.BASE_PREFIX) || + dir.getName().startsWith(AcidUtils.DELTA_PREFIX)) { + boolean sawBase = dir.getName().startsWith(AcidUtils.BASE_PREFIX); + FileStatus[] files = fs.listStatus(dir, AcidUtils.bucketFileFilter); + for (int j = 0; j < files.length; j++) { + // For each file, figure out which bucket it is. + Matcher matcher = AcidUtils.BUCKET_DIGIT_PATTERN.matcher(files[j].getPath().getName()); + addFileToMap(matcher, files[j].getPath(), sawBase, splitToBucketMap); + } + } else { + // Legacy file, see if it's a bucket file + Matcher matcher = AcidUtils.LEGACY_BUCKET_DIGIT_PATTERN.matcher(dir.getName()); + addFileToMap(matcher, dir, true, splitToBucketMap); + } + } + List splits = new ArrayList(splitToBucketMap.size()); + for (Map.Entry e : splitToBucketMap.entrySet()) { + BucketTracker bt = e.getValue(); + splits.add(new CompactorInputSplit(entries, e.getKey(), bt.buckets, + bt.sawBase ? baseDir : null, deltaDirs)); + } + + LOG.debug("Returning " + splits.size() + " splits"); + return splits.toArray(new InputSplit[splits.size()]); + } + + @Override + public RecordReader getRecordReader( + InputSplit inputSplit, JobConf entries, Reporter reporter) throws IOException { + return new CompactorRecordReader((CompactorInputSplit)inputSplit); + } + + private void addFileToMap(Matcher matcher, Path file, boolean sawBase, + Map splitToBucketMap) { + if (!matcher.find()) { + LOG.warn("Found a non-bucket file that we thought matched the bucket pattern! " + + file.toString()); + } + int bucketNum = Integer.valueOf(matcher.group()); + BucketTracker bt = splitToBucketMap.get(bucketNum); + if (bt == null) { + bt = new BucketTracker(); + splitToBucketMap.put(bucketNum, bt); + } + LOG.debug("Adding " + file.toString() + " to list of files for splits"); + bt.buckets.add(file); + bt.sawBase |= sawBase; + } + + private static class BucketTracker { + BucketTracker() { + sawBase = false; + buckets = new ArrayList(); + } + + boolean sawBase; + List buckets; + } + } + + static class CompactorRecordReader + implements RecordReader { + private CompactorInputSplit split; + + CompactorRecordReader(CompactorInputSplit split) { + this.split = split; + } + + @Override + public boolean next(NullWritable key, + CompactorInputSplit compactorInputSplit) throws IOException { + if (split != null) { + compactorInputSplit.set(split); + split = null; + return true; + } + return false; + } + + @Override + public NullWritable createKey() { + return NullWritable.get(); + } + + @Override + public CompactorInputSplit createValue() { + return new CompactorInputSplit(); + } + + @Override + public long getPos() throws IOException { + return 0; + } + + @Override + public void close() throws IOException { + + } + + @Override + public float getProgress() throws IOException { + return 0; + } + } + + static class CompactorMap + implements Mapper { + + JobConf jobConf; + FSRecordWriter writer; + + @Override + public void map(NullWritable key, CompactorInputSplit split, + OutputCollector nullWritableVOutputCollector, + Reporter reporter) throws IOException { + // This will only get called once, since CompactRecordReader only returns one record, + // the input split. + // Based on the split we're passed we go instantiate the real reader and then iterate on it + // until it finishes. + AcidInputFormat aif = + instantiate(AcidInputFormat.class, jobConf.get(INPUT_FORMAT_CLASS_NAME)); + ValidTxnList txnList = + new ValidTxnListImpl(jobConf.get(ValidTxnList.VALID_TXNS_KEY)); + + AcidInputFormat.RawReader reader = + aif.getRawReader(jobConf, jobConf.getBoolean(IS_MAJOR, false), split.getBucket(), + txnList, split.getBaseDir(), split.getDeltaDirs()); + RecordIdentifier identifier = reader.createKey(); + V value = reader.createValue(); + getWriter(reporter, reader.getObjectInspector(), split.getBucket()); + while (reader.next(identifier, value)) { + writer.write(value); + reporter.progress(); + } + } + + @Override + public void configure(JobConf entries) { + jobConf = entries; + } + + @Override + public void close() throws IOException { + if (writer != null) { + writer.close(false); + } + } + + private void getWriter(Reporter reporter, ObjectInspector inspector, + int bucket) throws IOException { + if (writer == null) { + AcidOutputFormat.Options options = new AcidOutputFormat.Options(jobConf); + options.inspector(inspector) + .writingBase(jobConf.getBoolean(IS_MAJOR, false)) + .isCompressed(jobConf.getBoolean(IS_COMPRESSED, false)) + .tableProperties(new StringableMap(jobConf.get(TABLE_PROPS)).toProperties()) + .reporter(reporter) + .minimumTransactionId(jobConf.getLong(MIN_TXN, Long.MAX_VALUE)) + .maximumTransactionId(jobConf.getLong(MAX_TXN, Long.MIN_VALUE)) + .bucket(bucket); + + // Instantiate the underlying output format + AcidOutputFormat aof = + instantiate(AcidOutputFormat.class, jobConf.get(OUTPUT_FORMAT_CLASS_NAME)); + + writer = aof.getRawRecordWriter(new Path(jobConf.get(TMP_LOCATION)), options); + } + } + + } + + static class StringableMap extends HashMap { + + StringableMap(String s) { + String[] parts = s.split(":", 2); + // read that many chars + int numElements = Integer.valueOf(parts[0]); + s = parts[1]; + for (int i = 0; i < numElements; i++) { + parts = s.split(":", 2); + int len = Integer.valueOf(parts[0]); + String key = null; + if (len > 0) key = parts[1].substring(0, len); + parts = parts[1].substring(len).split(":", 2); + len = Integer.valueOf(parts[0]); + String value = null; + if (len > 0) value = parts[1].substring(0, len); + s = parts[1].substring(len); + put(key, value); + } + } + + StringableMap(Map m) { + super(m); + } + + @Override + public String toString() { + StringBuffer buf = new StringBuffer(); + buf.append(size()); + buf.append(':'); + if (size() > 0) { + for (Map.Entry entry : entrySet()) { + int length = (entry.getKey() == null) ? 0 : entry.getKey().length(); + buf.append(entry.getKey() == null ? 0 : length); + buf.append(':'); + if (length > 0) buf.append(entry.getKey()); + length = (entry.getValue() == null) ? 0 : entry.getValue().length(); + buf.append(length); + buf.append(':'); + if (length > 0) buf.append(entry.getValue()); + } + } + return buf.toString(); + } + + public Properties toProperties() { + Properties props = new Properties(); + props.putAll(this); + return props; + } + } + + static class StringableList extends ArrayList { + StringableList() { + + } + + StringableList(String s) { + String[] parts = s.split(":", 2); + // read that many chars + int numElements = Integer.valueOf(parts[0]); + s = parts[1]; + for (int i = 0; i < numElements; i++) { + parts = s.split(":", 2); + int len = Integer.valueOf(parts[0]); + String val = parts[1].substring(0, len); + s = parts[1].substring(len); + add(new Path(val)); + } + } + + @Override + public String toString() { + StringBuffer buf = new StringBuffer(); + buf.append(size()); + buf.append(':'); + if (size() > 0) { + for (Path p : this) { + buf.append(p.toString().length()); + buf.append(':'); + buf.append(p.toString()); + } + } + return buf.toString(); + } + } + + private static T instantiate(Class classType, String classname) throws IOException { + T t = null; + try { + Class c = Class.forName(classname); + Object o = c.newInstance(); + if (classType.isAssignableFrom(o.getClass())) { + t = (T)o; + } else { + String s = classname + " is not an instance of " + classType.getName(); + LOG.error(s); + throw new IOException(s); + } + } catch (ClassNotFoundException e) { + LOG.error("Unable to instantiate class, " + StringUtils.stringifyException(e)); + throw new IOException(e); + } catch (InstantiationException e) { + LOG.error("Unable to instantiate class, " + StringUtils.stringifyException(e)); + throw new IOException(e); + } catch (IllegalAccessException e) { + LOG.error("Unable to instantiate class, " + StringUtils.stringifyException(e)); + throw new IOException(e); + } + return t; + } + + static class CompactorOutputCommitter extends OutputCommitter { + + @Override + public void setupJob(JobContext jobContext) throws IOException { + + } + + @Override + public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException { + + } + + @Override + public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOException { + return false; + } + + @Override + public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException { + + } + + @Override + public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException { + + } + + @Override + public void commitJob(JobContext context) throws IOException { + Path tmpLocation = new Path(context.getJobConf().get(TMP_LOCATION)); + Path finalLocation = new Path(context.getJobConf().get(FINAL_LOCATION)); + FileSystem fs = tmpLocation.getFileSystem(context.getJobConf()); + LOG.debug("Moving contents of " + tmpLocation.toString() + " to " + + finalLocation.toString()); + + FileStatus[] contents = fs.listStatus(tmpLocation); + for (int i = 0; i < contents.length; i++) { + Path newPath = new Path(finalLocation, contents[i].getPath().getName()); + fs.rename(contents[i].getPath(), newPath); + } + fs.delete(tmpLocation, true); + } + + @Override + public void abortJob(JobContext context, int status) throws IOException { + Path tmpLocation = new Path(context.getJobConf().get(TMP_LOCATION)); + FileSystem fs = tmpLocation.getFileSystem(context.getJobConf()); + LOG.debug("Removing " + tmpLocation.toString()); + fs.delete(tmpLocation, true); + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java new file mode 100644 index 0000000..715f9c0 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.MetaStoreThread; +import org.apache.hadoop.hive.metastore.RawStore; +import org.apache.hadoop.hive.metastore.RawStoreProxy; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.txn.CompactionInfo; +import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler; +import org.apache.hadoop.hive.metastore.txn.TxnHandler; +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.UserGroupInformation; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.List; + +/** + * Superclass for all threads in the compactor. + */ +abstract class CompactorThread extends Thread implements MetaStoreThread { + static final private String CLASS_NAME = CompactorThread.class.getName(); + static final private Log LOG = LogFactory.getLog(CLASS_NAME); + + protected HiveConf conf; + protected CompactionTxnHandler txnHandler; + protected RawStore rs; + protected int threadId; + protected BooleanPointer stop; + + @Override + public void setHiveConf(HiveConf conf) { + this.conf = conf; + } + + @Override + public void setThreadId(int threadId) { + this.threadId = threadId; + + } + + @Override + public void init(BooleanPointer stop) throws MetaException { + this.stop = stop; + setPriority(MIN_PRIORITY); + setDaemon(true); // this means the process will exit without waiting for this thread + + // Get our own instance of the transaction handler + txnHandler = new CompactionTxnHandler(conf); + + // Get our own connection to the database so we can get table and partition information. + rs = RawStoreProxy.getProxy(conf, conf, + conf.getVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL), threadId); + } + + /** + * Find the table being compacted + * @param ci compaction info returned from the compaction queue + * @return metastore table + * @throws org.apache.hadoop.hive.metastore.api.MetaException if the table cannot be found. + */ + protected Table resolveTable(CompactionInfo ci) throws MetaException { + try { + return rs.getTable(ci.dbname, ci.tableName); + } catch (MetaException e) { + LOG.error("Unable to find table " + ci.getFullTableName() + ", " + e.getMessage()); + throw e; + } + } + + /** + * Get the partition being compacted. + * @param ci compaction info returned from the compaction queue + * @return metastore partition, or null if there is not partition in this compaction info + * @throws Exception if underlying calls throw, or if the partition name resolves to more than + * one partition. + */ + protected Partition resolvePartition(CompactionInfo ci) throws Exception { + Partition p = null; + if (ci.partName != null) { + List names = new ArrayList(1); + names.add(ci.partName); + List parts = null; + try { + parts = rs.getPartitionsByNames(ci.dbname, ci.tableName, names); + } catch (Exception e) { + LOG.error("Unable to find partition " + ci.getFullPartitionName() + ", " + e.getMessage()); + throw e; + } + if (parts.size() != 1) { + LOG.error(ci.getFullPartitionName() + " does not refer to a single partition"); + throw new MetaException("Too many partitions"); + } + return parts.get(0); + } else { + return null; + } + } + + /** + * Get the storage descriptor for a compaction. + * @param t table from {@link #resolveTable(org.apache.hadoop.hive.metastore.txn.CompactionInfo)} + * @param p table from {@link #resolvePartition(org.apache.hadoop.hive.metastore.txn.CompactionInfo)} + * @return metastore storage descriptor. + */ + protected StorageDescriptor resolveStorageDescriptor(Table t, Partition p) { + return (p == null) ? t.getSd() : p.getSd(); + } + + /** + * Determine which user to run an operation as, based on the owner of the directory to be + * compacted. It is asserted that either the user running the hive metastore or the table + * owner must be able to stat the directory and determine the owner. + * @param location directory that will be read or written to. + * @param t metastore table object + * @return username of the owner of the location. + * @throws java.io.IOException if neither the hive metastore user nor the table owner can stat + * the location. + */ + protected String findUserToRunAs(String location, Table t) throws IOException, + InterruptedException { + LOG.debug("Determining who to run the job as."); + final Path p = new Path(location); + final FileSystem fs = p.getFileSystem(conf); + try { + FileStatus stat = fs.getFileStatus(p); + LOG.debug("Running job as " + stat.getOwner()); + return stat.getOwner(); + } catch (AccessControlException e) { + // TODO not sure this is the right exception + LOG.debug("Unable to stat file as current user, trying as table owner"); + + // Now, try it as the table owner and see if we get better luck. + final List wrapper = new ArrayList(1); + UserGroupInformation ugi = UserGroupInformation.createProxyUser(t.getOwner(), + UserGroupInformation.getLoginUser()); + ugi.doAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws Exception { + FileStatus stat = fs.getFileStatus(p); + wrapper.add(stat.getOwner()); + return null; + } + }); + + if (wrapper.size() == 1) { + LOG.debug("Running job as " + wrapper.get(0)); + return wrapper.get(0); + } + } + LOG.error("Unable to stat file as either current user or table owner, giving up"); + throw new IOException("Unable to stat file"); + } + + /** + * Determine whether to run this job as the current user or whether we need a doAs to switch + * users. + * @param owner of the directory we will be working in, as determined by + * {@link #findUserToRunAs(String, org.apache.hadoop.hive.metastore.api.Table)} + * @return true if the job should run as the current user, false if a doAs is needed. + */ + protected boolean runJobAsSelf(String owner) { + return (owner.equals(System.getProperty("user.name"))); + } + + protected String tableName(Table t) { + return t.getDbName() + "." + t.getTableName(); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java new file mode 100644 index 0000000..3211759 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -0,0 +1,281 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.CompactionRequest; +import org.apache.hadoop.hive.metastore.api.CompactionType; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.txn.CompactionInfo; +import org.apache.hadoop.hive.metastore.txn.TxnHandler; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.StringUtils; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.List; +import java.util.Set; + +/** + * A class to initiate compactions. This will run in a separate thread. + */ +public class Initiator extends CompactorThread { + static final private String CLASS_NAME = Initiator.class.getName(); + static final private Log LOG = LogFactory.getLog(CLASS_NAME); + static final private int threadId = 10000; + + static final private String NO_COMPACTION = "NO_AUTO_COMPACTION"; + + private long checkInterval; + + @Override + public void run() { + // Make sure nothing escapes this run method and kills the metastore at large, + // so wrap it in a big catch Throwable statement. + try { + recoverFailedCompactions(false); + + int abortedThreashold = HiveConf.getIntVar(conf, + HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD); + + // Make sure we run through the loop once before checking to stop as this makes testing + // much easier. The stop value is only for testing anyway and not used when called from + // HiveMetaStore. + do { + long startedAt = System.currentTimeMillis(); + + // Wrap the inner parts of the loop in a catch throwable so that any errors in the loop + // don't doom the entire thread. + try { + ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest()); + ValidTxnList txns = TxnHandler.createValidTxnList(txnHandler.getOpenTxns()); + Set potentials = txnHandler.findPotentialCompactions(abortedThreashold); + LOG.debug("Found " + potentials.size() + " potential compactions, " + + "checking to see if we should compact any of them"); + for (CompactionInfo ci : potentials) { + LOG.debug("Checking to see if we should compact " + ci.getFullPartitionName()); + try { + Table t = resolveTable(ci); + // check if no compaction set for this table + if (t.getParameters().get(NO_COMPACTION) != null) { + LOG.info("Table " + tableName(t) + " marked " + NO_COMPACTION + + " so we will not compact it."); + continue; + } + + // Check if we already have initiated or are working on a compaction for this partition + // or table. If so, skip it. If we are just waiting on cleaning we can still check, + // as it may be time to compact again even though we haven't cleaned. + if (lookForCurrentCompactions(currentCompactions, ci)) { + LOG.debug("Found currently initiated or working compaction for " + + ci.getFullPartitionName() + " so we will not initiate another compaction"); + continue; + } + + // Figure out who we should run the file operations as + Partition p = resolvePartition(ci); + StorageDescriptor sd = resolveStorageDescriptor(t, p); + String runAs = findUserToRunAs(sd.getLocation(), t); + + CompactionType compactionNeeded = checkForCompaction(ci, txns, sd, runAs); + if (compactionNeeded != null) requestCompaction(ci, runAs, compactionNeeded); + } catch (Throwable t) { + LOG.error("Caught exception while trying to determine if we should compact " + + ci.getFullPartitionName() + ". Marking clean to avoid repeated failures, " + + "" + StringUtils.stringifyException(t)); + txnHandler.markCleaned(ci); + } + } + + // Check for timed out remote workers. + recoverFailedCompactions(true); + + // Clean anything from the txns table that has no components left in txn_components. + txnHandler.cleanEmptyAbortedTxns(); + } catch (Throwable t) { + LOG.error("Initiator loop caught unexpected exception this time through the loop: " + + StringUtils.stringifyException(t)); + } + + long elapsedTime = System.currentTimeMillis() - startedAt; + if (elapsedTime >= checkInterval || stop.boolVal) continue; + else Thread.sleep(checkInterval - elapsedTime); + + } while (!stop.boolVal); + } catch (Throwable t) { + LOG.error("Caught an exception in the main loop of compactor initiator, exiting " + + StringUtils.stringifyException(t)); + } + } + + @Override + public void init(BooleanPointer stop) throws MetaException { + super.init(stop); + checkInterval = + HiveConf.getLongVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_CHECK_INTERVAL) * 1000; + } + + private void recoverFailedCompactions(boolean remoteOnly) throws MetaException { + if (!remoteOnly) txnHandler.revokeFromLocalWorkers(Worker.hostname()); + txnHandler.revokeTimedoutWorkers(HiveConf.getLongVar(conf, + HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT)); + } + + // Figure out if there are any currently running compactions on the same table or partition. + private boolean lookForCurrentCompactions(ShowCompactResponse compactions, + CompactionInfo ci) { + if (compactions.getCompacts() != null) { + for (ShowCompactResponseElement e : compactions.getCompacts()) { + if (!e.getState().equals(TxnHandler.CLEANING_RESPONSE) && + e.getDbname().equals(ci.dbname) && + e.getTablename().equals(ci.tableName) && + (e.getPartitionname() == null && ci.partName == null || + e.getPartitionname().equals(ci.partName))) { + return true; + } + } + } + return false; + } + + private CompactionType checkForCompaction(final CompactionInfo ci, + final ValidTxnList txns, + final StorageDescriptor sd, + final String runAs) + throws IOException, InterruptedException { + // If it's marked as too many aborted, we already know we need to compact + if (ci.tooManyAborts) { + LOG.debug("Found too many aborted transactions for " + ci.getFullPartitionName() + ", " + + "initiating major compaction"); + return CompactionType.MAJOR; + } + if (runJobAsSelf(runAs)) { + return determineCompactionType(ci, txns, sd); + } else { + LOG.info("Going to initiate as user " + runAs); + UserGroupInformation ugi = UserGroupInformation.createProxyUser(runAs, + UserGroupInformation.getLoginUser()); + return ugi.doAs(new PrivilegedExceptionAction() { + @Override + public CompactionType run() throws Exception { + return determineCompactionType(ci, txns, sd); + } + }); + } + } + + private CompactionType determineCompactionType(CompactionInfo ci, ValidTxnList txns, + StorageDescriptor sd) + throws IOException, InterruptedException { + boolean noBase = false; + Path location = new Path(sd.getLocation()); + FileSystem fs = location.getFileSystem(conf); + AcidUtils.Directory dir = AcidUtils.getAcidState(location, conf, txns); + Path base = dir.getBaseDirectory(); + long baseSize = 0; + FileStatus stat = null; + if (base != null) { + stat = fs.getFileStatus(base); + if (!stat.isDir()) { + LOG.error("Was assuming base " + base.toString() + " is directory, but it's a file!"); + return null; + } + baseSize = sumDirSize(fs, base); + } + + List originals = dir.getOriginalFiles(); + for (FileStatus origStat : originals) { + baseSize += origStat.getLen(); + } + + long deltaSize = 0; + List deltas = dir.getCurrentDirectories(); + for (AcidUtils.ParsedDelta delta : deltas) { + stat = fs.getFileStatus(delta.getPath()); + if (!stat.isDir()) { + LOG.error("Was assuming delta " + delta.getPath().toString() + " is a directory, " + + "but it's a file!"); + return null; + } + deltaSize += sumDirSize(fs, delta.getPath()); + } + + if (baseSize == 0 && deltaSize > 0) { + noBase = true; + } else { + float deltaPctThreshold = HiveConf.getFloatVar(conf, + HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD); + boolean bigEnough = (float)deltaSize/(float)baseSize > deltaPctThreshold; + if (LOG.isDebugEnabled()) { + StringBuffer msg = new StringBuffer("delta size: "); + msg.append(deltaSize); + msg.append(" base size: "); + msg.append(baseSize); + msg.append(" threshold: "); + msg.append(deltaPctThreshold); + msg.append(" will major compact: "); + msg.append(bigEnough); + LOG.debug(msg); + } + if (bigEnough) return CompactionType.MAJOR; + } + + int deltaNumThreshold = HiveConf.getIntVar(conf, + HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD); + boolean enough = deltas.size() > deltaNumThreshold; + if (enough) { + LOG.debug("Found " + deltas.size() + " delta files, threshold is " + deltaNumThreshold + + (enough ? "" : "not") + " and no base, requesting " + (noBase ? "major" : "minor") + + " compaction"); + // If there's no base file, do a major compaction + return noBase ? CompactionType.MAJOR : CompactionType.MINOR; + } + return null; + } + + private long sumDirSize(FileSystem fs, Path dir) throws IOException { + long size = 0; + FileStatus[] buckets = fs.listStatus(dir); + for (int i = 0; i < buckets.length; i++) { + size += buckets[i].getLen(); + } + return size; + } + + private void requestCompaction(CompactionInfo ci, String runAs, CompactionType type) throws MetaException { + String s = "Requesting " + type.toString() + " compaction for " + ci.getFullPartitionName(); + LOG.info(s); + CompactionRequest rqst = new CompactionRequest(ci.dbname, ci.tableName, type); + if (ci.partName != null) rqst.setPartitionname(ci.partName); + rqst.setRunas(runAs); + txnHandler.compact(rqst); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java new file mode 100644 index 0000000..f464df8 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.metastore.api.CompactionType; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.txn.CompactionInfo; +import org.apache.hadoop.hive.metastore.txn.TxnHandler; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.StringUtils; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.security.PrivilegedExceptionAction; + +/** + * A class to do compactions. This will run in a separate thread. It will spin on the + * compaction queue and look for new work to do. + */ +public class Worker extends CompactorThread { + static final private String CLASS_NAME = Worker.class.getName(); + static final private Log LOG = LogFactory.getLog(CLASS_NAME); + static final private long SLEEP_TIME = 5000; + static final private int baseThreadNum = 10002; + + private String name; + + /** + * Get the hostname that this worker is run on. Made static and public so that other classes + * can use the same method to know what host their worker threads are running on. + * @return hostname + */ + public static String hostname() { + try { + return InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + LOG.error("Unable to resolve my host name " + e.getMessage()); + throw new RuntimeException(e); + } + } + + @Override + public void run() { + // Make sure nothing escapes this run method and kills the metastore at large, + // so wrap it in a big catch Throwable statement. + try { + do { + CompactionInfo ci = txnHandler.findNextToCompact(name); + + if (ci == null && !stop.boolVal) { + try { + Thread.sleep(SLEEP_TIME); + continue; + } catch (InterruptedException e) { + LOG.warn("Worker thread sleep interrupted " + e.getMessage()); + continue; + } + } + + // Find the table we will be working with. + Table t1 = null; + try { + t1 = resolveTable(ci); + } catch (MetaException e) { + txnHandler.markCleaned(ci); + continue; + } + // This chicanery is to get around the fact that the table needs to be final in order to + // go into the doAs below. + final Table t = t1; + + // Find the partition we will be working with, if there is one. + Partition p = null; + try { + p = resolvePartition(ci); + } catch (Exception e) { + txnHandler.markCleaned(ci); + continue; + } + + // Find the appropriate storage descriptor + final StorageDescriptor sd = resolveStorageDescriptor(t, p); + + // Check that the table or partition isn't sorted, as we don't yet support that. + if (sd.getSortCols() != null && !sd.getSortCols().isEmpty()) { + LOG.error("Attempt to compact sorted table, which is not yet supported!"); + txnHandler.markCleaned(ci); + continue; + } + + final boolean isMajor = (ci.type == CompactionType.MAJOR); + final ValidTxnList txns = + TxnHandler.createValidTxnList(txnHandler.getOpenTxns()); + final StringBuffer jobName = new StringBuffer(name); + jobName.append("-compactor-"); + jobName.append(ci.getFullPartitionName()); + + // Determine who to run as + String runAs; + if (ci.runAs == null) { + runAs = findUserToRunAs(sd.getLocation(), t); + txnHandler.setRunAs(ci.id, runAs); + } else { + runAs = ci.runAs; + } + + LOG.info("Starting " + ci.type.toString() + " compaction for " + + ci.getFullPartitionName()); + + final CompactorMR mr = new CompactorMR(); + try { + if (runJobAsSelf(runAs)) { + mr.run(conf, jobName.toString(), t, sd, txns, isMajor); + } else { + UserGroupInformation ugi = UserGroupInformation.createProxyUser(t.getOwner(), + UserGroupInformation.getLoginUser()); + ugi.doAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws Exception { + mr.run(conf, jobName.toString(), t, sd, txns, isMajor); + return null; + } + }); + } + txnHandler.markCompacted(ci); + } catch (Exception e) { + LOG.error("Caught exception while trying to compact " + ci.getFullPartitionName() + + ". Marking clean to avoid repeated failures, " + StringUtils.stringifyException(e)); + txnHandler.markCleaned(ci); + } + } while (!stop.boolVal); + } catch (Throwable t) { + LOG.error("Caught an exception in the main loop of compactor worker " + name + + ", exiting " + StringUtils.stringifyException(t)); + } + } + + @Override + public void init(BooleanPointer stop) throws MetaException { + super.init(stop); + + StringBuffer name = new StringBuffer(hostname()); + name.append("-"); + name.append(getId()); + this.name = name.toString(); + setName(name.toString()); + } + +} diff --git ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java new file mode 100644 index 0000000..eaabc71 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java @@ -0,0 +1,463 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.MetaStoreThread; +import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler; +import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.apache.hadoop.hive.ql.io.AcidInputFormat; +import org.apache.hadoop.hive.ql.io.AcidOutputFormat; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.FSRecordWriter; +import org.apache.hadoop.hive.ql.io.RecordIdentifier; +import org.apache.hadoop.hive.ql.io.RecordUpdater; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.*; +import org.apache.hadoop.util.Progressable; +import org.apache.thrift.TException; + +import java.io.EOFException; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Stack; + +/** + * Super class for all of the compactor test modules. + */ +public abstract class CompactorTest { + static final private String CLASS_NAME = CompactorTest.class.getName(); + static final private Log LOG = LogFactory.getLog(CLASS_NAME); + + protected CompactionTxnHandler txnHandler; + protected IMetaStoreClient ms; + protected long sleepTime = 1000; + + private MetaStoreThread.BooleanPointer stop = new MetaStoreThread.BooleanPointer(); + private File tmpdir; + + protected CompactorTest() throws Exception { + HiveConf conf = new HiveConf(); + TxnDbUtil.setConfValues(conf); + TxnDbUtil.cleanDb(); + ms = new HiveMetaStoreClient(conf); + txnHandler = new CompactionTxnHandler(conf); + tmpdir = new File(System.getProperty("java.io.tmpdir") + + System.getProperty("file.separator") + "compactor_test_tables"); + tmpdir.mkdir(); + tmpdir.deleteOnExit(); + } + + protected void startInitiator(HiveConf conf) throws Exception { + startThread('i', conf); + } + + protected void startWorker(HiveConf conf) throws Exception { + startThread('w', conf); + } + + protected void startCleaner(HiveConf conf) throws Exception { + startThread('c', conf); + } + + protected Table newTable(String dbName, String tableName, boolean partitioned) throws TException { + return newTable(dbName, tableName, partitioned, new HashMap(), null); + } + + protected Table newTable(String dbName, String tableName, boolean partitioned, + Map parameters) throws TException { + return newTable(dbName, tableName, partitioned, parameters, null); + + } + + protected Table newTable(String dbName, String tableName, boolean partitioned, + Map parameters, List sortCols) + throws TException { + Table table = new Table(); + table.setTableName(tableName); + table.setDbName(dbName); + table.setOwner("me"); + table.setSd(newStorageDescriptor(getLocation(tableName, null), sortCols)); + List partKeys = new ArrayList(1); + if (partitioned) { + partKeys.add(new FieldSchema("ds", "string", "no comment")); + table.setPartitionKeys(partKeys); + } + + table.setParameters(parameters); + + ms.createTable(table); + return table; + } + + protected Partition newPartition(Table t, String value) throws Exception { + return newPartition(t, value, null); + } + + protected Partition newPartition(Table t, String value, List sortCols) throws Exception { + Partition part = new Partition(); + part.addToValues(value); + part.setDbName(t.getDbName()); + part.setTableName(t.getTableName()); + part.setSd(newStorageDescriptor(getLocation(t.getTableName(), value), sortCols)); + part.setParameters(new HashMap()); + ms.add_partition(part); + return part; + } + + protected long openTxn() throws MetaException { + List txns = txnHandler.openTxns(new OpenTxnRequest(1, System.getProperty("user.name"), + Worker.hostname())).getTxn_ids(); + return txns.get(0); + } + + protected void addDeltaFile(HiveConf conf, Table t, Partition p, long minTxn, long maxTxn, + int numRecords) throws Exception{ + addFile(conf, t, p, minTxn, maxTxn, numRecords, FileType.DELTA, 2, true); + } + + protected void addBaseFile(HiveConf conf, Table t, Partition p, long maxTxn, + int numRecords) throws Exception{ + addFile(conf, t, p, 0, maxTxn, numRecords, FileType.BASE, 2, true); + } + + protected void addLegacyFile(HiveConf conf, Table t, Partition p, + int numRecords) throws Exception { + addFile(conf, t, p, 0, 0, numRecords, FileType.LEGACY, 2, true); + } + + protected void addDeltaFile(HiveConf conf, Table t, Partition p, long minTxn, long maxTxn, + int numRecords, int numBuckets, boolean allBucketsPresent) + throws Exception { + addFile(conf, t, p, minTxn, maxTxn, numRecords, FileType.DELTA, numBuckets, allBucketsPresent); + } + + protected void addBaseFile(HiveConf conf, Table t, Partition p, long maxTxn, + int numRecords, int numBuckets, boolean allBucketsPresent) + throws Exception { + addFile(conf, t, p, 0, maxTxn, numRecords, FileType.BASE, numBuckets, allBucketsPresent); + } + + protected void addLegacyFile(HiveConf conf, Table t, Partition p, + int numRecords, int numBuckets, boolean allBucketsPresent) + throws Exception { + addFile(conf, t, p, 0, 0, numRecords, FileType.LEGACY, numBuckets, allBucketsPresent); + } + + protected List getDirectories(HiveConf conf, Table t, Partition p) throws Exception { + String partValue = (p == null) ? null : p.getValues().get(0); + String location = getLocation(t.getTableName(), partValue); + Path dir = new Path(location); + FileSystem fs = FileSystem.get(conf); + FileStatus[] stats = fs.listStatus(dir); + List paths = new ArrayList(stats.length); + for (int i = 0; i < stats.length; i++) paths.add(stats[i].getPath()); + return paths; + } + + protected void burnThroughTransactions(int num) throws MetaException, NoSuchTxnException, TxnAbortedException { + OpenTxnsResponse rsp = txnHandler.openTxns(new OpenTxnRequest(num, "me", "localhost")); + for (long tid : rsp.getTxn_ids()) txnHandler.commitTxn(new CommitTxnRequest(tid)); + } + + private StorageDescriptor newStorageDescriptor(String location, List sortCols) { + StorageDescriptor sd = new StorageDescriptor(); + List cols = new ArrayList(2); + cols.add(new FieldSchema("a", "varchar(25)", "still no comment")); + cols.add(new FieldSchema("b", "int", "comment")); + sd.setCols(cols); + sd.setLocation(location); + sd.setInputFormat(MockInputFormat.class.getName()); + sd.setOutputFormat(MockOutputFormat.class.getName()); + sd.setNumBuckets(1); + SerDeInfo serde = new SerDeInfo(); + serde.setSerializationLib(LazySimpleSerDe.class.getName()); + sd.setSerdeInfo(serde); + List bucketCols = new ArrayList(1); + bucketCols.add("a"); + sd.setBucketCols(bucketCols); + + if (sortCols != null) { + sd.setSortCols(sortCols); + } + return sd; + } + + // I can't do this with @Before because I want to be able to control the config file provided + // to each test. + private void startThread(char type, HiveConf conf) throws Exception { + TxnDbUtil.setConfValues(conf); + CompactorThread t = null; + switch (type) { + case 'i': t = new Initiator(); break; + case 'w': t = new Worker(); break; + case 'c': t = new Cleaner(); break; + default: throw new RuntimeException("Huh? Unknown thread type."); + } + t.setThreadId((int) t.getId()); + t.setHiveConf(conf); + stop.boolVal = true; + t.init(stop); + t.run(); + } + + private String getLocation(String tableName, String partValue) { + String location = tmpdir.getAbsolutePath() + + System.getProperty("file.separator") + tableName; + if (partValue != null) { + location += System.getProperty("file.separator") + "ds=" + partValue; + } + return location; + } + + private enum FileType {BASE, DELTA, LEGACY}; + + private void addFile(HiveConf conf, Table t, Partition p, long minTxn, long maxTxn, + int numRecords, FileType type, int numBuckets, + boolean allBucketsPresent) throws Exception { + String partValue = (p == null) ? null : p.getValues().get(0); + Path location = new Path(getLocation(t.getTableName(), partValue)); + String filename = null; + switch (type) { + case BASE: filename = "base_" + maxTxn; break; + case DELTA: filename = "delta_" + minTxn + "_" + maxTxn; break; + case LEGACY: break; // handled below + } + + FileSystem fs = FileSystem.get(conf); + for (int bucket = 0; bucket < numBuckets; bucket++) { + if (bucket == 0 && !allBucketsPresent) continue; // skip one + Path partFile = null; + if (type == FileType.LEGACY) { + partFile = new Path(location, String.format(AcidUtils.BUCKET_DIGITS, bucket) + "_0"); + } else { + Path dir = new Path(location, filename); + fs.mkdirs(dir); + partFile = AcidUtils.createBucketFile(dir, bucket); + } + FSDataOutputStream out = fs.create(partFile); + for (int i = 0; i < numRecords; i++) { + RecordIdentifier ri = new RecordIdentifier(maxTxn - 1, bucket, i); + ri.write(out); + out.writeBytes("mary had a little lamb its fleece was white as snow\n"); + } + out.close(); + } + } + + static class MockInputFormat implements AcidInputFormat { + + @Override + public AcidInputFormat.RowReader getReader(InputSplit split, + Options options) throws + IOException { + return null; + } + + @Override + public RawReader getRawReader(Configuration conf, boolean collapseEvents, int bucket, + ValidTxnList validTxnList, + Path baseDirectory, Path... deltaDirectory) throws IOException { + + List filesToRead = new ArrayList(); + if (baseDirectory != null) { + if (baseDirectory.getName().startsWith(AcidUtils.BASE_PREFIX)) { + Path p = AcidUtils.createBucketFile(baseDirectory, bucket); + FileSystem fs = p.getFileSystem(conf); + if (fs.exists(p)) filesToRead.add(p); + } else { + filesToRead.add(new Path(baseDirectory, "00000_0")); + + } + } + for (int i = 0; i < deltaDirectory.length; i++) { + Path p = AcidUtils.createBucketFile(deltaDirectory[i], bucket); + FileSystem fs = p.getFileSystem(conf); + if (fs.exists(p)) filesToRead.add(p); + } + return new MockRawReader(conf, filesToRead); + } + + @Override + public InputSplit[] getSplits(JobConf entries, int i) throws IOException { + return new InputSplit[0]; + } + + @Override + public RecordReader getRecordReader(InputSplit inputSplit, JobConf entries, + Reporter reporter) throws IOException { + return null; + } + + @Override + public boolean validateInput(FileSystem fs, HiveConf conf, ArrayList files) throws + IOException { + return false; + } + } + + static class MockRawReader implements AcidInputFormat.RawReader { + private Stack filesToRead; + private Configuration conf; + private FSDataInputStream is = null; + private FileSystem fs; + + MockRawReader(Configuration conf, List files) throws IOException { + filesToRead = new Stack(); + for (Path file : files) filesToRead.push(file); + this.conf = conf; + fs = FileSystem.get(conf); + } + + @Override + public ObjectInspector getObjectInspector() { + return null; + } + + @Override + public boolean next(RecordIdentifier identifier, Text text) throws IOException { + if (is == null) { + // Open the next file + if (filesToRead.empty()) return false; + Path p = filesToRead.pop(); + LOG.debug("Reading records from " + p.toString()); + is = fs.open(p); + } + String line = null; + try { + identifier.readFields(is); + line = is.readLine(); + } catch (EOFException e) { + } + if (line == null) { + // Set our current entry to null (since it's done) and try again. + is = null; + return next(identifier, text); + } + text.set(line); + return true; + } + + @Override + public RecordIdentifier createKey() { + return new RecordIdentifier(); + } + + @Override + public Text createValue() { + return new Text(); + } + + @Override + public long getPos() throws IOException { + return 0; + } + + @Override + public void close() throws IOException { + + } + + @Override + public float getProgress() throws IOException { + return 0; + } + } + + // This class isn't used and I suspect does totally the wrong thing. It's only here so that I + // can provide some output format to the tables and partitions I create. I actually write to + // those tables directory. + static class MockOutputFormat implements AcidOutputFormat { + + @Override + public RecordUpdater getRecordUpdater(Path path, Options options) throws + IOException { + return null; + } + + @Override + public FSRecordWriter getRawRecordWriter(Path path, Options options) throws IOException { + return new MockRecordWriter(path, options); + } + + @Override + public FSRecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath, + Class valueClass, + boolean isCompressed, Properties tableProperties, + Progressable progress) throws IOException { + return null; + } + + @Override + public RecordWriter getRecordWriter(FileSystem fileSystem, JobConf entries, + String s, + Progressable progressable) throws + IOException { + return null; + } + + @Override + public void checkOutputSpecs(FileSystem fileSystem, JobConf entries) throws IOException { + + } + } + + // This class isn't used and I suspect does totally the wrong thing. It's only here so that I + // can provide some output format to the tables and partitions I create. I actually write to + // those tables directory. + static class MockRecordWriter implements FSRecordWriter { + private FSDataOutputStream os; + + MockRecordWriter(Path basedir, AcidOutputFormat.Options options) throws IOException { + FileSystem fs = FileSystem.get(options.getConfiguration()); + Path p = AcidUtils.createFilename(basedir, options); + os = fs.create(p); + } + + @Override + public void write(Writable w) throws IOException { + Text t = (Text)w; + os.writeBytes(t.toString()); + os.writeBytes("\n"); + } + + @Override + public void close(boolean abort) throws IOException { + os.close(); + } + } + + +} diff --git ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java new file mode 100644 index 0000000..dce2573 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java @@ -0,0 +1,304 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import junit.framework.Assert; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.hive.metastore.txn.CompactionInfo; +import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +/** + * Tests for the compactor Cleaner thread + */ +public class TestCleaner extends CompactorTest { + public TestCleaner() throws Exception { + super(); + } + + @Test + public void nothing() throws Exception { + // Test that the whole things works when there's nothing in the queue. This is just a + // survival test. + startCleaner(new HiveConf()); + } + + @Test + public void cleanupAfterMajorTableCompaction() throws Exception { + Table t = newTable("default", "camtc", false); + + HiveConf conf = new HiveConf(); + + addBaseFile(conf, t, null, 20L, 20); + addDeltaFile(conf, t, null, 21L, 22L, 2); + addDeltaFile(conf, t, null, 23L, 24L, 2); + addBaseFile(conf, t, null, 25L, 25); + + burnThroughTransactions(25); + + CompactionRequest rqst = new CompactionRequest("default", "camtc", CompactionType.MAJOR); + txnHandler.compact(rqst); + CompactionInfo ci = txnHandler.findNextToCompact("fred"); + txnHandler.markCompacted(ci); + txnHandler.setRunAs(ci.id, System.getProperty("user.name")); + + startCleaner(conf); + + // Check there are no compactions requests left. + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertNull(rsp.getCompacts()); + + // Check that the files are removed + List paths = getDirectories(conf, t, null); + Assert.assertEquals(1, paths.size()); + Assert.assertEquals("base_25", paths.get(0).getName()); + } + + @Test + public void cleanupAfterMajorPartitionCompaction() throws Exception { + Table t = newTable("default", "campc", true); + Partition p = newPartition(t, "today"); + + HiveConf conf = new HiveConf(); + + addBaseFile(conf, t, p, 20L, 20); + addDeltaFile(conf, t, p, 21L, 22L, 2); + addDeltaFile(conf, t, p, 23L, 24L, 2); + addBaseFile(conf, t, p, 25L, 25); + + burnThroughTransactions(25); + + CompactionRequest rqst = new CompactionRequest("default", "campc", CompactionType.MAJOR); + rqst.setPartitionname("ds=today"); + txnHandler.compact(rqst); + CompactionInfo ci = txnHandler.findNextToCompact("fred"); + txnHandler.markCompacted(ci); + txnHandler.setRunAs(ci.id, System.getProperty("user.name")); + + startCleaner(conf); + + // Check there are no compactions requests left. + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertNull(rsp.getCompacts()); + + // Check that the files are removed + List paths = getDirectories(conf, t, p); + Assert.assertEquals(1, paths.size()); + Assert.assertEquals("base_25", paths.get(0).getName()); + } + + @Test + public void cleanupAfterMinorTableCompaction() throws Exception { + Table t = newTable("default", "camitc", false); + + HiveConf conf = new HiveConf(); + + addBaseFile(conf, t, null, 20L, 20); + addDeltaFile(conf, t, null, 21L, 22L, 2); + addDeltaFile(conf, t, null, 23L, 24L, 2); + addDeltaFile(conf, t, null, 21L, 24L, 4); + + burnThroughTransactions(25); + + CompactionRequest rqst = new CompactionRequest("default", "camitc", CompactionType.MINOR); + txnHandler.compact(rqst); + CompactionInfo ci = txnHandler.findNextToCompact("fred"); + txnHandler.markCompacted(ci); + txnHandler.setRunAs(ci.id, System.getProperty("user.name")); + + startCleaner(conf); + + // Check there are no compactions requests left. + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertNull(rsp.getCompacts()); + + // Check that the files are removed + List paths = getDirectories(conf, t, null); + Assert.assertEquals(2, paths.size()); + boolean sawBase = false, sawDelta = false; + for (Path p : paths) { + if (p.getName().equals("base_20")) sawBase = true; + else if (p.getName().equals("delta_21_24")) sawDelta = true; + else Assert.fail("Unexpected file " + p.getName()); + } + Assert.assertTrue(sawBase); + Assert.assertTrue(sawDelta); + } + + @Test + public void cleanupAfterMinorPartitionCompaction() throws Exception { + Table t = newTable("default", "camipc", true); + Partition p = newPartition(t, "today"); + + HiveConf conf = new HiveConf(); + + addBaseFile(conf, t, p, 20L, 20); + addDeltaFile(conf, t, p, 21L, 22L, 2); + addDeltaFile(conf, t, p, 23L, 24L, 2); + addDeltaFile(conf, t, p, 21L, 24L, 4); + + burnThroughTransactions(25); + + CompactionRequest rqst = new CompactionRequest("default", "camipc", CompactionType.MINOR); + rqst.setPartitionname("ds=today"); + txnHandler.compact(rqst); + CompactionInfo ci = txnHandler.findNextToCompact("fred"); + txnHandler.markCompacted(ci); + txnHandler.setRunAs(ci.id, System.getProperty("user.name")); + + startCleaner(conf); + + // Check there are no compactions requests left. + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertNull(rsp.getCompacts()); + + // Check that the files are removed + List paths = getDirectories(conf, t, p); + Assert.assertEquals(2, paths.size()); + boolean sawBase = false, sawDelta = false; + for (Path path : paths) { + if (path.getName().equals("base_20")) sawBase = true; + else if (path.getName().equals("delta_21_24")) sawDelta = true; + else Assert.fail("Unexpected file " + path.getName()); + } + Assert.assertTrue(sawBase); + Assert.assertTrue(sawDelta); + } + + @Test + public void blockedByLockTable() throws Exception { + Table t = newTable("default", "bblt", false); + + HiveConf conf = new HiveConf(); + + addBaseFile(conf, t, null, 20L, 20); + addDeltaFile(conf, t, null, 21L, 22L, 2); + addDeltaFile(conf, t, null, 23L, 24L, 2); + addDeltaFile(conf, t, null, 21L, 24L, 4); + + burnThroughTransactions(25); + + CompactionRequest rqst = new CompactionRequest("default", "bblt", CompactionType.MINOR); + txnHandler.compact(rqst); + CompactionInfo ci = txnHandler.findNextToCompact("fred"); + txnHandler.markCompacted(ci); + txnHandler.setRunAs(ci.id, System.getProperty("user.name")); + + LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "default"); + comp.setTablename("bblt"); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + LockResponse res = txnHandler.lock(req); + + startCleaner(conf); + + // Check there are no compactions requests left. + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List compacts = rsp.getCompacts(); + Assert.assertEquals(1, compacts.size()); + Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); + Assert.assertEquals("bblt", compacts.get(0).getTablename()); + Assert.assertEquals(CompactionType.MINOR, compacts.get(0).getType()); + } + + @Test + public void blockedByLockPartition() throws Exception { + Table t = newTable("default", "bblp", true); + Partition p = newPartition(t, "today"); + + HiveConf conf = new HiveConf(); + + addBaseFile(conf, t, p, 20L, 20); + addDeltaFile(conf, t, p, 21L, 22L, 2); + addDeltaFile(conf, t, p, 23L, 24L, 2); + addDeltaFile(conf, t, p, 21L, 24L, 4); + + burnThroughTransactions(25); + + CompactionRequest rqst = new CompactionRequest("default", "bblp", CompactionType.MINOR); + rqst.setPartitionname("ds=today"); + txnHandler.compact(rqst); + CompactionInfo ci = txnHandler.findNextToCompact("fred"); + txnHandler.markCompacted(ci); + txnHandler.setRunAs(ci.id, System.getProperty("user.name")); + + LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default"); + comp.setTablename("bblp"); + comp.setPartitionname("ds=today"); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + LockResponse res = txnHandler.lock(req); + + startCleaner(conf); + + // Check there are no compactions requests left. + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List compacts = rsp.getCompacts(); + Assert.assertEquals(1, compacts.size()); + Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); + Assert.assertEquals("bblp", compacts.get(0).getTablename()); + Assert.assertEquals("ds=today", compacts.get(0).getPartitionname()); + Assert.assertEquals(CompactionType.MINOR, compacts.get(0).getType()); + } + + @Test + public void cleanupAfterMajorPartitionCompactionNoBase() throws Exception { + Table t = newTable("default", "campcnb", true); + Partition p = newPartition(t, "today"); + + HiveConf conf = new HiveConf(); + + addDeltaFile(conf, t, p, 1L, 22L, 22); + addDeltaFile(conf, t, p, 23L, 24L, 2); + addBaseFile(conf, t, p, 25L, 25); + + burnThroughTransactions(25); + + CompactionRequest rqst = new CompactionRequest("default", "campcnb", CompactionType.MAJOR); + rqst.setPartitionname("ds=today"); + txnHandler.compact(rqst); + CompactionInfo ci = txnHandler.findNextToCompact("fred"); + txnHandler.markCompacted(ci); + txnHandler.setRunAs(ci.id, System.getProperty("user.name")); + + startCleaner(conf); + + // Check there are no compactions requests left. + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertNull(rsp.getCompacts()); + + // Check that the files are removed + List paths = getDirectories(conf, t, p); + Assert.assertEquals(1, paths.size()); + Assert.assertEquals("base_25", paths.get(0).getName()); + } + + @Before + public void setUpTxnDb() throws Exception { + TxnDbUtil.setConfValues(new HiveConf()); + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java new file mode 100644 index 0000000..6b46822 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java @@ -0,0 +1,634 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import junit.framework.Assert; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Tests for the compactor Initiator thread. + */ +public class TestInitiator extends CompactorTest { + static final private String CLASS_NAME = TestInitiator.class.getName(); + static final private Log LOG = LogFactory.getLog(CLASS_NAME); + + public TestInitiator() throws Exception { + super(); + } + + @Test + public void nothing() throws Exception { + // Test that the whole things works when there's nothing in the queue. This is just a + // survival test. + startInitiator(new HiveConf()); + } + + @Test + public void recoverFailedLocalWorkers() throws Exception { + Table t = newTable("default", "rflw1", false); + CompactionRequest rqst = new CompactionRequest("default", "rflw1", CompactionType.MINOR); + txnHandler.compact(rqst); + + t = newTable("default", "rflw2", false); + rqst = new CompactionRequest("default", "rflw2", CompactionType.MINOR); + txnHandler.compact(rqst); + + txnHandler.findNextToCompact(Worker.hostname() + "-193892"); + txnHandler.findNextToCompact("nosuchhost-193892"); + + startInitiator(new HiveConf()); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List compacts = rsp.getCompacts(); + Assert.assertEquals(2, compacts.size()); + boolean sawInitiated = false; + for (ShowCompactResponseElement c : compacts) { + if (c.getState().equals("working")) { + Assert.assertEquals("nosuchhost-193892", c.getWorkerid()); + } else if (c.getState().equals("initiated")) { + sawInitiated = true; + } else { + Assert.fail("Unexpected state"); + } + } + Assert.assertTrue(sawInitiated); + } + + @Test + public void recoverFailedRemoteWorkers() throws Exception { + Table t = newTable("default", "rfrw1", false); + CompactionRequest rqst = new CompactionRequest("default", "rfrw1", CompactionType.MINOR); + txnHandler.compact(rqst); + + txnHandler.findNextToCompact("nosuchhost-193892"); + + HiveConf conf = new HiveConf(); + HiveConf.setLongVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT, 1L); + + startInitiator(conf); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List compacts = rsp.getCompacts(); + Assert.assertEquals(1, compacts.size()); + Assert.assertEquals("initiated", compacts.get(0).getState()); + } + + @Test + public void majorCompactOnTableTooManyAborts() throws Exception { + Table t = newTable("default", "mcottma", false); + + HiveConf conf = new HiveConf(); + HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 10); + + for (int i = 0; i < 11; i++) { + long txnid = openTxn(); + LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default"); + comp.setTablename("mcottma"); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + req.setTxnid(txnid); + LockResponse res = txnHandler.lock(req); + txnHandler.abortTxn(new AbortTxnRequest(txnid)); + } + + startInitiator(conf); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List compacts = rsp.getCompacts(); + Assert.assertEquals(1, compacts.size()); + Assert.assertEquals("initiated", compacts.get(0).getState()); + Assert.assertEquals("mcottma", compacts.get(0).getTablename()); + Assert.assertEquals(CompactionType.MAJOR, compacts.get(0).getType()); + } + + @Test + public void majorCompactOnPartitionTooManyAborts() throws Exception { + Table t = newTable("default", "mcoptma", true); + Partition p = newPartition(t, "today"); + + HiveConf conf = new HiveConf(); + HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 10); + + for (int i = 0; i < 11; i++) { + long txnid = openTxn(); + LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default"); + comp.setTablename("mcoptma"); + comp.setPartitionname("ds=today"); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + req.setTxnid(txnid); + LockResponse res = txnHandler.lock(req); + txnHandler.abortTxn(new AbortTxnRequest(txnid)); + } + + startInitiator(conf); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List compacts = rsp.getCompacts(); + Assert.assertEquals(1, compacts.size()); + Assert.assertEquals("initiated", compacts.get(0).getState()); + Assert.assertEquals("mcoptma", compacts.get(0).getTablename()); + Assert.assertEquals("ds=today", compacts.get(0).getPartitionname()); + Assert.assertEquals(CompactionType.MAJOR, compacts.get(0).getType()); + } + + @Test + public void noCompactOnManyDifferentPartitionAborts() throws Exception { + Table t = newTable("default", "ncomdpa", true); + for (int i = 0; i < 11; i++) { + Partition p = newPartition(t, "day-" + i); + } + + HiveConf conf = new HiveConf(); + HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 10); + + for (int i = 0; i < 11; i++) { + long txnid = openTxn(); + LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default"); + comp.setTablename("ncomdpa"); + comp.setPartitionname("ds=day-" + i); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + req.setTxnid(txnid); + LockResponse res = txnHandler.lock(req); + txnHandler.abortTxn(new AbortTxnRequest(txnid)); + } + + startInitiator(conf); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertNull(rsp.getCompacts()); + } + + @Test + public void cleanEmptyAbortedTxns() throws Exception { + // Test that we are cleaning aborted transactions with no components left in txn_components. + // Put one aborted transaction with an entry in txn_components to make sure we don't + // accidently clean it too. + Table t = newTable("default", "ceat", false); + + HiveConf conf = new HiveConf(); + + long txnid = openTxn(); + LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default"); + comp.setTablename("ceat"); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + req.setTxnid(txnid); + LockResponse res = txnHandler.lock(req); + txnHandler.abortTxn(new AbortTxnRequest(txnid)); + + for (int i = 0; i < 100; i++) { + txnid = openTxn(); + txnHandler.abortTxn(new AbortTxnRequest(txnid)); + } + GetOpenTxnsResponse openTxns = txnHandler.getOpenTxns(); + Assert.assertEquals(101, openTxns.getOpen_txnsSize()); + + startInitiator(conf); + + openTxns = txnHandler.getOpenTxns(); + Assert.assertEquals(1, openTxns.getOpen_txnsSize()); + } + + @Test + public void noCompactWhenNoCompactSet() throws Exception { + Map parameters = new HashMap(1); + parameters.put("NO_AUTO_COMPACTION", "true"); + Table t = newTable("default", "ncwncs", false, parameters); + + HiveConf conf = new HiveConf(); + HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 10); + + for (int i = 0; i < 11; i++) { + long txnid = openTxn(); + LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default"); + comp.setTablename("ncwncs"); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + req.setTxnid(txnid); + LockResponse res = txnHandler.lock(req); + txnHandler.abortTxn(new AbortTxnRequest(txnid)); + } + + startInitiator(conf); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertNull(rsp.getCompacts()); + } + + @Test + public void noCompactWhenCompactAlreadyScheduled() throws Exception { + Table t = newTable("default", "ncwcas", false); + + HiveConf conf = new HiveConf(); + HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 10); + + for (int i = 0; i < 11; i++) { + long txnid = openTxn(); + LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default"); + comp.setTablename("ncwcas"); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + req.setTxnid(txnid); + LockResponse res = txnHandler.lock(req); + txnHandler.abortTxn(new AbortTxnRequest(txnid)); + } + + CompactionRequest rqst = new CompactionRequest("default", "ncwcas", CompactionType.MAJOR); + txnHandler.compact(rqst); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List compacts = rsp.getCompacts(); + Assert.assertEquals(1, compacts.size()); + Assert.assertEquals("initiated", compacts.get(0).getState()); + Assert.assertEquals("ncwcas", compacts.get(0).getTablename()); + + startInitiator(conf); + + rsp = txnHandler.showCompact(new ShowCompactRequest()); + compacts = rsp.getCompacts(); + Assert.assertEquals(1, compacts.size()); + Assert.assertEquals("initiated", compacts.get(0).getState()); + Assert.assertEquals("ncwcas", compacts.get(0).getTablename()); + Assert.assertEquals(CompactionType.MAJOR, compacts.get(0).getType()); + } + + @Test + public void compactTableHighDeltaPct() throws Exception { + Table t = newTable("default", "cthdp", false); + + HiveConf conf = new HiveConf(); + + addBaseFile(conf, t, null, 20L, 20); + addDeltaFile(conf, t, null, 21L, 22L, 2); + addDeltaFile(conf, t, null, 23L, 24L, 2); + + burnThroughTransactions(23); + + long txnid = openTxn(); + LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default"); + comp.setTablename("cthdp"); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + req.setTxnid(txnid); + LockResponse res = txnHandler.lock(req); + txnHandler.commitTxn(new CommitTxnRequest(txnid)); + + startInitiator(conf); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List compacts = rsp.getCompacts(); + Assert.assertEquals(1, compacts.size()); + Assert.assertEquals("initiated", compacts.get(0).getState()); + Assert.assertEquals("cthdp", compacts.get(0).getTablename()); + Assert.assertEquals(CompactionType.MAJOR, compacts.get(0).getType()); + } + + @Test + public void compactPartitionHighDeltaPct() throws Exception { + Table t = newTable("default", "cphdp", true); + Partition p = newPartition(t, "today"); + + HiveConf conf = new HiveConf(); + + addBaseFile(conf, t, p, 20L, 20); + addDeltaFile(conf, t, p, 21L, 22L, 2); + addDeltaFile(conf, t, p, 23L, 24L, 2); + + burnThroughTransactions(23); + + long txnid = openTxn(); + LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "default"); + comp.setTablename("cphdp"); + comp.setPartitionname("ds=today"); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + req.setTxnid(txnid); + LockResponse res = txnHandler.lock(req); + txnHandler.commitTxn(new CommitTxnRequest(txnid)); + + startInitiator(conf); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List compacts = rsp.getCompacts(); + Assert.assertEquals(1, compacts.size()); + Assert.assertEquals("initiated", compacts.get(0).getState()); + Assert.assertEquals("cphdp", compacts.get(0).getTablename()); + Assert.assertEquals("ds=today", compacts.get(0).getPartitionname()); + Assert.assertEquals(CompactionType.MAJOR, compacts.get(0).getType()); + } + + @Test + public void noCompactTableDeltaPctNotHighEnough() throws Exception { + Table t = newTable("default", "nctdpnhe", false); + + HiveConf conf = new HiveConf(); + + addBaseFile(conf, t, null, 50L, 50); + addDeltaFile(conf, t, null, 21L, 22L, 2); + addDeltaFile(conf, t, null, 23L, 24L, 2); + + burnThroughTransactions(53); + + long txnid = openTxn(); + LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default"); + comp.setTablename("nctdpnhe"); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + req.setTxnid(txnid); + LockResponse res = txnHandler.lock(req); + txnHandler.commitTxn(new CommitTxnRequest(txnid)); + + startInitiator(conf); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertNull(rsp.getCompacts()); + } + + @Test + public void compactTableTooManyDeltas() throws Exception { + Table t = newTable("default", "cttmd", false); + + HiveConf conf = new HiveConf(); + + addBaseFile(conf, t, null, 200L, 200); + addDeltaFile(conf, t, null, 201L, 201L, 1); + addDeltaFile(conf, t, null, 202L, 202L, 1); + addDeltaFile(conf, t, null, 203L, 203L, 1); + addDeltaFile(conf, t, null, 204L, 204L, 1); + addDeltaFile(conf, t, null, 205L, 205L, 1); + addDeltaFile(conf, t, null, 206L, 206L, 1); + addDeltaFile(conf, t, null, 207L, 207L, 1); + addDeltaFile(conf, t, null, 208L, 208L, 1); + addDeltaFile(conf, t, null, 209L, 209L, 1); + addDeltaFile(conf, t, null, 210L, 210L, 1); + addDeltaFile(conf, t, null, 211L, 211L, 1); + + burnThroughTransactions(210); + + long txnid = openTxn(); + LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default"); + comp.setTablename("cttmd"); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + req.setTxnid(txnid); + LockResponse res = txnHandler.lock(req); + txnHandler.commitTxn(new CommitTxnRequest(txnid)); + + startInitiator(conf); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List compacts = rsp.getCompacts(); + Assert.assertEquals(1, compacts.size()); + Assert.assertEquals("initiated", compacts.get(0).getState()); + Assert.assertEquals("cttmd", compacts.get(0).getTablename()); + Assert.assertEquals(CompactionType.MINOR, compacts.get(0).getType()); + } + + @Test + public void compactPartitionTooManyDeltas() throws Exception { + Table t = newTable("default", "cptmd", true); + Partition p = newPartition(t, "today"); + + HiveConf conf = new HiveConf(); + + addBaseFile(conf, t, p, 200L, 200); + addDeltaFile(conf, t, p, 201L, 201L, 1); + addDeltaFile(conf, t, p, 202L, 202L, 1); + addDeltaFile(conf, t, p, 203L, 203L, 1); + addDeltaFile(conf, t, p, 204L, 204L, 1); + addDeltaFile(conf, t, p, 205L, 205L, 1); + addDeltaFile(conf, t, p, 206L, 206L, 1); + addDeltaFile(conf, t, p, 207L, 207L, 1); + addDeltaFile(conf, t, p, 208L, 208L, 1); + addDeltaFile(conf, t, p, 209L, 209L, 1); + addDeltaFile(conf, t, p, 210L, 210L, 1); + addDeltaFile(conf, t, p, 211L, 211L, 1); + + burnThroughTransactions(210); + + long txnid = openTxn(); + LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "default"); + comp.setTablename("cptmd"); + comp.setPartitionname("ds=today"); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + req.setTxnid(txnid); + LockResponse res = txnHandler.lock(req); + txnHandler.commitTxn(new CommitTxnRequest(txnid)); + + startInitiator(conf); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List compacts = rsp.getCompacts(); + Assert.assertEquals(1, compacts.size()); + Assert.assertEquals("initiated", compacts.get(0).getState()); + Assert.assertEquals("cptmd", compacts.get(0).getTablename()); + Assert.assertEquals("ds=today", compacts.get(0).getPartitionname()); + Assert.assertEquals(CompactionType.MINOR, compacts.get(0).getType()); + } + + @Test + public void noCompactTableNotEnoughDeltas() throws Exception { + Table t = newTable("default", "nctned", false); + + HiveConf conf = new HiveConf(); + + addBaseFile(conf, t, null, 200L, 200); + addDeltaFile(conf, t, null, 201L, 205L, 5); + addDeltaFile(conf, t, null, 206L, 211L, 6); + + burnThroughTransactions(210); + + long txnid = openTxn(); + LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default"); + comp.setTablename("nctned"); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + req.setTxnid(txnid); + LockResponse res = txnHandler.lock(req); + txnHandler.commitTxn(new CommitTxnRequest(txnid)); + + startInitiator(conf); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertNull(rsp.getCompacts()); + } + + @Test + public void chooseMajorOverMinorWhenBothValid() throws Exception { + Table t = newTable("default", "cmomwbv", false); + + HiveConf conf = new HiveConf(); + + addBaseFile(conf, t, null, 200L, 200); + addDeltaFile(conf, t, null, 201L, 211L, 11); + addDeltaFile(conf, t, null, 212L, 222L, 11); + addDeltaFile(conf, t, null, 223L, 233L, 11); + addDeltaFile(conf, t, null, 234L, 244L, 11); + addDeltaFile(conf, t, null, 245L, 255L, 11); + addDeltaFile(conf, t, null, 256L, 266L, 11); + addDeltaFile(conf, t, null, 267L, 277L, 11); + addDeltaFile(conf, t, null, 278L, 288L, 11); + addDeltaFile(conf, t, null, 289L, 299L, 11); + addDeltaFile(conf, t, null, 300L, 310L, 11); + addDeltaFile(conf, t, null, 311L, 321L, 11); + + burnThroughTransactions(320); + + long txnid = openTxn(); + LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default"); + comp.setTablename("cmomwbv"); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + req.setTxnid(txnid); + LockResponse res = txnHandler.lock(req); + txnHandler.commitTxn(new CommitTxnRequest(txnid)); + + startInitiator(conf); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List compacts = rsp.getCompacts(); + Assert.assertEquals(1, compacts.size()); + Assert.assertEquals("initiated", compacts.get(0).getState()); + Assert.assertEquals("cmomwbv", compacts.get(0).getTablename()); + Assert.assertEquals(CompactionType.MAJOR, compacts.get(0).getType()); + } + + @Test + public void enoughDeltasNoBase() throws Exception { + Table t = newTable("default", "ednb", true); + Partition p = newPartition(t, "today"); + + HiveConf conf = new HiveConf(); + + addDeltaFile(conf, t, p, 1L, 201L, 200); + addDeltaFile(conf, t, p, 202L, 202L, 1); + addDeltaFile(conf, t, p, 203L, 203L, 1); + addDeltaFile(conf, t, p, 204L, 204L, 1); + addDeltaFile(conf, t, p, 205L, 205L, 1); + addDeltaFile(conf, t, p, 206L, 206L, 1); + addDeltaFile(conf, t, p, 207L, 207L, 1); + addDeltaFile(conf, t, p, 208L, 208L, 1); + addDeltaFile(conf, t, p, 209L, 209L, 1); + addDeltaFile(conf, t, p, 210L, 210L, 1); + addDeltaFile(conf, t, p, 211L, 211L, 1); + + burnThroughTransactions(210); + + long txnid = openTxn(); + LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "default"); + comp.setTablename("ednb"); + comp.setPartitionname("ds=today"); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + req.setTxnid(txnid); + LockResponse res = txnHandler.lock(req); + txnHandler.commitTxn(new CommitTxnRequest(txnid)); + + startInitiator(conf); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List compacts = rsp.getCompacts(); + Assert.assertEquals(1, compacts.size()); + Assert.assertEquals("initiated", compacts.get(0).getState()); + Assert.assertEquals("ednb", compacts.get(0).getTablename()); + Assert.assertEquals("ds=today", compacts.get(0).getPartitionname()); + Assert.assertEquals(CompactionType.MAJOR, compacts.get(0).getType()); + } + + @Test + public void twoTxnsOnSamePartitionGenerateOneCompactionRequest() throws Exception { + Table t = newTable("default", "ttospgocr", true); + Partition p = newPartition(t, "today"); + + HiveConf conf = new HiveConf(); + + addBaseFile(conf, t, p, 20L, 20); + addDeltaFile(conf, t, p, 21L, 22L, 2); + addDeltaFile(conf, t, p, 23L, 24L, 2); + + burnThroughTransactions(23); + + long txnid = openTxn(); + LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "default"); + comp.setTablename("ttospgocr"); + comp.setPartitionname("ds=today"); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + req.setTxnid(txnid); + LockResponse res = txnHandler.lock(req); + txnHandler.commitTxn(new CommitTxnRequest(txnid)); + + txnid = openTxn(); + comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "default"); + comp.setTablename("ttospgocr"); + comp.setPartitionname("ds=today"); + components = new ArrayList(1); + components.add(comp); + req = new LockRequest(components, "me", "localhost"); + req.setTxnid(txnid); + res = txnHandler.lock(req); + txnHandler.commitTxn(new CommitTxnRequest(txnid)); + + startInitiator(conf); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List compacts = rsp.getCompacts(); + Assert.assertEquals(1, compacts.size()); + Assert.assertEquals("initiated", compacts.get(0).getState()); + Assert.assertEquals("ttospgocr", compacts.get(0).getTablename()); + Assert.assertEquals("ds=today", compacts.get(0).getPartitionname()); + Assert.assertEquals(CompactionType.MAJOR, compacts.get(0).getType()); + } + + // TODO test compactions with legacy file types + + @Before + public void setUpTxnDb() throws Exception { + TxnDbUtil.setConfValues(new HiveConf()); + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java new file mode 100644 index 0000000..1b9469d --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java @@ -0,0 +1,678 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import junit.framework.Assert; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Tests for the worker thread and its MR jobs. + */ +public class TestWorker extends CompactorTest { + static final private String CLASS_NAME = TestWorker.class.getName(); + static final private Log LOG = LogFactory.getLog(CLASS_NAME); + + public TestWorker() throws Exception { + super(); + } + + @Test + public void nothing() throws Exception { + // Test that the whole things works when there's nothing in the queue. This is just a + // survival test. + startWorker(new HiveConf()); + } + + @Test + public void stringableMap() throws Exception { + // Empty map case + CompactorMR.StringableMap m = new CompactorMR.StringableMap(new HashMap()); + String s = m.toString(); + Assert.assertEquals("0:", s); + m = new CompactorMR.StringableMap(s); + Assert.assertEquals(0, m.size()); + + Map base = new HashMap(); + base.put("mary", "poppins"); + base.put("bert", null); + base.put(null, "banks"); + m = new CompactorMR.StringableMap(base); + s = m.toString(); + m = new CompactorMR.StringableMap(s); + Assert.assertEquals(3, m.size()); + Map saw = new HashMap(3); + saw.put("mary", false); + saw.put("bert", false); + saw.put(null, false); + for (Map.Entry e : m.entrySet()) { + saw.put(e.getKey(), true); + if ("mary".equals(e.getKey())) Assert.assertEquals("poppins", e.getValue()); + else if ("bert".equals(e.getKey())) Assert.assertNull(e.getValue()); + else if (null == e.getKey()) Assert.assertEquals("banks", e.getValue()); + else Assert.fail("Unexpected value " + e.getKey()); + } + Assert.assertEquals(3, saw.size()); + Assert.assertTrue(saw.get("mary")); + Assert.assertTrue(saw.get("bert")); + Assert.assertTrue(saw.get(null)); + } + + @Test + public void stringableList() throws Exception { + // Empty list case + CompactorMR.StringableList ls = new CompactorMR.StringableList(); + String s = ls.toString(); + Assert.assertEquals("0:", s); + ls = new CompactorMR.StringableList(s); + Assert.assertEquals(0, ls.size()); + + ls = new CompactorMR.StringableList(); + ls.add(new Path("/tmp")); + ls.add(new Path("/usr")); + s = ls.toString(); + Assert.assertTrue("Expected 2:4:/tmp4:/usr or 2:4:/usr4:/tmp, got " + s, + "2:4:/tmp4:/usr".equals(s) || "2:4:/usr4:/tmp".equals(s)); + ls = new CompactorMR.StringableList(s); + Assert.assertEquals(2, ls.size()); + boolean sawTmp = false, sawUsr = false; + for (Path p : ls) { + if ("/tmp".equals(p.toString())) sawTmp = true; + else if ("/usr".equals(p.toString())) sawUsr = true; + else Assert.fail("Unexpected path " + p.toString()); + } + Assert.assertTrue(sawTmp); + Assert.assertTrue(sawUsr); + } + + @Test + public void inputSplit() throws Exception { + String basename = "/warehouse/foo/base_1"; + String delta1 = "/warehouse/foo/delta_2_3"; + String delta2 = "/warehouse/foo/delta_4_7"; + + HiveConf conf = new HiveConf(); + Path file = new Path(System.getProperty("java.io.tmpdir") + + System.getProperty("file.separator") + "newWriteInputSplitTest"); + FileSystem fs = FileSystem.get(conf); + FSDataOutputStream os = fs.create(file); + for (int i = 0; i < 10; i++) { + os.writeBytes("mary had a little lamb its fleece was white as snow\n"); + } + os.close(); + List files = new ArrayList(1); + files.add(file); + + Path[] deltas = new Path[2]; + deltas[0] = new Path(delta1); + deltas[1] = new Path(delta2); + + CompactorMR.CompactorInputSplit split = + new CompactorMR.CompactorInputSplit(conf, 3, files, new Path(basename), deltas); + + Assert.assertEquals(520L, split.getLength()); + String[] locations = split.getLocations(); + Assert.assertEquals(1, locations.length); + Assert.assertEquals("localhost", locations[0]); + + ByteArrayOutputStream buf = new ByteArrayOutputStream(); + DataOutput out = new DataOutputStream(buf); + split.write(out); + + split = new CompactorMR.CompactorInputSplit(); + DataInput in = new DataInputStream(new ByteArrayInputStream(buf.toByteArray())); + split.readFields(in); + + Assert.assertEquals(3, split.getBucket()); + Assert.assertEquals(basename, split.getBaseDir().toString()); + deltas = split.getDeltaDirs(); + Assert.assertEquals(2, deltas.length); + Assert.assertEquals(delta1, deltas[0].toString()); + Assert.assertEquals(delta2, deltas[1].toString()); + } + + @Test + public void inputSplitNullBase() throws Exception { + String delta1 = "/warehouse/foo/delta_2_3"; + String delta2 = "/warehouse/foo/delta_4_7"; + + HiveConf conf = new HiveConf(); + Path file = new Path(System.getProperty("java.io.tmpdir") + + System.getProperty("file.separator") + "newWriteInputSplitTest"); + FileSystem fs = FileSystem.get(conf); + FSDataOutputStream os = fs.create(file); + for (int i = 0; i < 10; i++) { + os.writeBytes("mary had a little lamb its fleece was white as snow\n"); + } + os.close(); + List files = new ArrayList(1); + files.add(file); + + Path[] deltas = new Path[2]; + deltas[0] = new Path(delta1); + deltas[1] = new Path(delta2); + + CompactorMR.CompactorInputSplit split = + new CompactorMR.CompactorInputSplit(conf, 3, files, null, deltas); + + ByteArrayOutputStream buf = new ByteArrayOutputStream(); + DataOutput out = new DataOutputStream(buf); + split.write(out); + + split = new CompactorMR.CompactorInputSplit(); + DataInput in = new DataInputStream(new ByteArrayInputStream(buf.toByteArray())); + split.readFields(in); + + Assert.assertEquals(3, split.getBucket()); + Assert.assertNull(split.getBaseDir()); + deltas = split.getDeltaDirs(); + Assert.assertEquals(2, deltas.length); + Assert.assertEquals(delta1, deltas[0].toString()); + Assert.assertEquals(delta2, deltas[1].toString()); + } + + @Test + public void sortedTable() throws Exception { + List sortCols = new ArrayList(1); + sortCols.add(new Order("b", 1)); + + Table t = newTable("default", "st", false, new HashMap(), sortCols); + + HiveConf conf = new HiveConf(); + + addBaseFile(conf, t, null, 20L, 20); + addDeltaFile(conf, t, null, 21L, 22L, 2); + addDeltaFile(conf, t, null, 23L, 24L, 2); + addDeltaFile(conf, t, null, 21L, 24L, 4); + + burnThroughTransactions(25); + + CompactionRequest rqst = new CompactionRequest("default", "st", CompactionType.MINOR); + txnHandler.compact(rqst); + + startWorker(new HiveConf()); + + // There should still be four directories in the location. + FileSystem fs = FileSystem.get(conf); + FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation())); + Assert.assertEquals(4, stat.length); + } + + @Test + public void sortedPartition() throws Exception { + List sortCols = new ArrayList(1); + sortCols.add(new Order("b", 1)); + + Table t = newTable("default", "sp", true, new HashMap(), sortCols); + Partition p = newPartition(t, "today", sortCols); + HiveConf conf = new HiveConf(); + + addBaseFile(conf, t, p, 20L, 20); + addDeltaFile(conf, t, p, 21L, 22L, 2); + addDeltaFile(conf, t, p, 23L, 24L, 2); + addDeltaFile(conf, t, p, 21L, 24L, 4); + + burnThroughTransactions(25); + + CompactionRequest rqst = new CompactionRequest("default", "sp", CompactionType.MINOR); + rqst.setPartitionname("ds=today"); + txnHandler.compact(rqst); + + startWorker(new HiveConf()); + + // There should still be four directories in the location. + FileSystem fs = FileSystem.get(conf); + FileStatus[] stat = fs.listStatus(new Path(p.getSd().getLocation())); + Assert.assertEquals(4, stat.length); + } + + @Test + public void minorTableWithBase() throws Exception { + LOG.debug("Starting minorTableWithBase"); + Table t = newTable("default", "mtwb", false); + + HiveConf conf = new HiveConf(); + + addBaseFile(conf, t, null, 20L, 20); + addDeltaFile(conf, t, null, 21L, 22L, 2); + addDeltaFile(conf, t, null, 23L, 24L, 2); + + burnThroughTransactions(25); + + CompactionRequest rqst = new CompactionRequest("default", "mtwb", CompactionType.MINOR); + txnHandler.compact(rqst); + + startWorker(conf); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List compacts = rsp.getCompacts(); + Assert.assertEquals(1, compacts.size()); + Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); + + // There should still now be 5 directories in the location + FileSystem fs = FileSystem.get(conf); + FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation())); +for (int i = 0; i < stat.length; i++) System.out.println("HERE: " + stat[i].getPath().toString()); + Assert.assertEquals(4, stat.length); + + // Find the new delta file and make sure it has the right contents + boolean sawNewDelta = false; + for (int i = 0; i < stat.length; i++) { + if (stat[i].getPath().getName().equals("delta_0000021_0000024")) { + sawNewDelta = true; + FileStatus[] buckets = fs.listStatus(stat[i].getPath()); + Assert.assertEquals(2, buckets.length); + Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); + Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); + Assert.assertEquals(208L, buckets[0].getLen()); + Assert.assertEquals(208L, buckets[1].getLen()); + } else { + LOG.debug("This is not the delta file you are looking for " + stat[i].getPath().getName()); + } + } + Assert.assertTrue(sawNewDelta); + } + + @Test + public void minorPartitionWithBase() throws Exception { + Table t = newTable("default", "mpwb", true); + Partition p = newPartition(t, "today"); + HiveConf conf = new HiveConf(); + + addBaseFile(conf, t, p, 20L, 20); + addDeltaFile(conf, t, p, 21L, 22L, 2); + addDeltaFile(conf, t, p, 23L, 24L, 2); + + burnThroughTransactions(25); + + CompactionRequest rqst = new CompactionRequest("default", "mpwb", CompactionType.MINOR); + rqst.setPartitionname("ds=today"); + txnHandler.compact(rqst); + + startWorker(new HiveConf()); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List compacts = rsp.getCompacts(); + Assert.assertEquals(1, compacts.size()); + Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); + + // There should still be four directories in the location. + FileSystem fs = FileSystem.get(conf); + FileStatus[] stat = fs.listStatus(new Path(p.getSd().getLocation())); + Assert.assertEquals(4, stat.length); + + // Find the new delta file and make sure it has the right contents + boolean sawNewDelta = false; + for (int i = 0; i < stat.length; i++) { + if (stat[i].getPath().getName().equals("delta_0000021_0000024")) { + sawNewDelta = true; + FileStatus[] buckets = fs.listStatus(stat[i].getPath()); + Assert.assertEquals(2, buckets.length); + Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); + Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); + Assert.assertEquals(208L, buckets[0].getLen()); + Assert.assertEquals(208L, buckets[1].getLen()); + } else { + LOG.debug("This is not the delta file you are looking for " + stat[i].getPath().getName()); + } + } + Assert.assertTrue(sawNewDelta); + } + + @Test + public void minorTableNoBase() throws Exception { + LOG.debug("Starting minorTableWithBase"); + Table t = newTable("default", "mtnb", false); + + HiveConf conf = new HiveConf(); + + addDeltaFile(conf, t, null, 1L, 2L, 2); + addDeltaFile(conf, t, null, 3L, 4L, 2); + + burnThroughTransactions(5); + + CompactionRequest rqst = new CompactionRequest("default", "mtnb", CompactionType.MINOR); + txnHandler.compact(rqst); + + startWorker(new HiveConf()); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List compacts = rsp.getCompacts(); + Assert.assertEquals(1, compacts.size()); + Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); + + // There should still now be 5 directories in the location + FileSystem fs = FileSystem.get(conf); + FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation())); + Assert.assertEquals(3, stat.length); + + // Find the new delta file and make sure it has the right contents + boolean sawNewDelta = false; + for (int i = 0; i < stat.length; i++) { + if (stat[i].getPath().getName().equals("delta_0000001_0000004")) { + sawNewDelta = true; + FileStatus[] buckets = fs.listStatus(stat[i].getPath()); + Assert.assertEquals(2, buckets.length); + Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); + Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); + Assert.assertEquals(208L, buckets[0].getLen()); + Assert.assertEquals(208L, buckets[1].getLen()); + } else { + LOG.debug("This is not the delta file you are looking for " + stat[i].getPath().getName()); + } + } + Assert.assertTrue(sawNewDelta); + } + + @Test + public void majorTableWithBase() throws Exception { + LOG.debug("Starting majorTableWithBase"); + Table t = newTable("default", "matwb", false); + + HiveConf conf = new HiveConf(); + + addBaseFile(conf, t, null, 20L, 20); + addDeltaFile(conf, t, null, 21L, 22L, 2); + addDeltaFile(conf, t, null, 23L, 24L, 2); + + burnThroughTransactions(25); + + CompactionRequest rqst = new CompactionRequest("default", "matwb", CompactionType.MAJOR); + txnHandler.compact(rqst); + + startWorker(new HiveConf()); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List compacts = rsp.getCompacts(); + Assert.assertEquals(1, compacts.size()); + Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); + + // There should still now be 5 directories in the location + FileSystem fs = FileSystem.get(conf); + FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation())); + Assert.assertEquals(4, stat.length); + + // Find the new delta file and make sure it has the right contents + boolean sawNewBase = false; + for (int i = 0; i < stat.length; i++) { + if (stat[i].getPath().getName().equals("base_0000024")) { + sawNewBase = true; + FileStatus[] buckets = fs.listStatus(stat[i].getPath()); + Assert.assertEquals(2, buckets.length); + Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); + Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); + Assert.assertEquals(1248L, buckets[0].getLen()); + Assert.assertEquals(1248L, buckets[1].getLen()); + } else { + LOG.debug("This is not the file you are looking for " + stat[i].getPath().getName()); + } + } + Assert.assertTrue(sawNewBase); + } + + @Test + public void majorPartitionWithBase() throws Exception { + LOG.debug("Starting majorPartitionWithBase"); + Table t = newTable("default", "mapwb", true); + Partition p = newPartition(t, "today"); + HiveConf conf = new HiveConf(); + + addBaseFile(conf, t, p, 20L, 20); + addDeltaFile(conf, t, p, 21L, 22L, 2); + addDeltaFile(conf, t, p, 23L, 24L, 2); + + burnThroughTransactions(25); + + CompactionRequest rqst = new CompactionRequest("default", "mapwb", CompactionType.MAJOR); + rqst.setPartitionname("ds=today"); + txnHandler.compact(rqst); + + startWorker(new HiveConf()); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List compacts = rsp.getCompacts(); + Assert.assertEquals(1, compacts.size()); + Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); + + // There should still be four directories in the location. + FileSystem fs = FileSystem.get(conf); + FileStatus[] stat = fs.listStatus(new Path(p.getSd().getLocation())); + Assert.assertEquals(4, stat.length); + + // Find the new delta file and make sure it has the right contents + boolean sawNewBase = false; + for (int i = 0; i < stat.length; i++) { + if (stat[i].getPath().getName().equals("base_0000024")) { + sawNewBase = true; + FileStatus[] buckets = fs.listStatus(stat[i].getPath()); + Assert.assertEquals(2, buckets.length); + Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); + Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); + Assert.assertEquals(1248L, buckets[0].getLen()); + Assert.assertEquals(1248L, buckets[1].getLen()); + } else { + LOG.debug("This is not the file you are looking for " + stat[i].getPath().getName()); + } + } + Assert.assertTrue(sawNewBase); + } + + @Test + public void majorTableNoBase() throws Exception { + LOG.debug("Starting majorTableNoBase"); + Table t = newTable("default", "matnb", false); + + HiveConf conf = new HiveConf(); + + addDeltaFile(conf, t, null, 1L, 2L, 2); + addDeltaFile(conf, t, null, 3L, 4L, 2); + + burnThroughTransactions(5); + + CompactionRequest rqst = new CompactionRequest("default", "matnb", CompactionType.MAJOR); + txnHandler.compact(rqst); + + startWorker(new HiveConf()); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List compacts = rsp.getCompacts(); + Assert.assertEquals(1, compacts.size()); + Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); + + // There should still now be 5 directories in the location + FileSystem fs = FileSystem.get(conf); + FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation())); + Assert.assertEquals(3, stat.length); + + // Find the new delta file and make sure it has the right contents + boolean sawNewBase = false; + for (int i = 0; i < stat.length; i++) { + if (stat[i].getPath().getName().equals("base_0000004")) { + sawNewBase = true; + FileStatus[] buckets = fs.listStatus(stat[i].getPath()); + Assert.assertEquals(2, buckets.length); + Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); + Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); + Assert.assertEquals(208L, buckets[0].getLen()); + Assert.assertEquals(208L, buckets[1].getLen()); + } else { + LOG.debug("This is not the file you are looking for " + stat[i].getPath().getName()); + } + } + Assert.assertTrue(sawNewBase); + } + + @Test + public void majorTableLegacy() throws Exception { + LOG.debug("Starting majorTableLegacy"); + Table t = newTable("default", "matl", false); + + HiveConf conf = new HiveConf(); + + addLegacyFile(conf, t, null, 20); + addDeltaFile(conf, t, null, 21L, 22L, 2); + addDeltaFile(conf, t, null, 23L, 24L, 2); + + burnThroughTransactions(25); + + CompactionRequest rqst = new CompactionRequest("default", "matl", CompactionType.MAJOR); + txnHandler.compact(rqst); + + startWorker(new HiveConf()); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List compacts = rsp.getCompacts(); + Assert.assertEquals(1, compacts.size()); + Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); + + // There should still now be 5 directories in the location + FileSystem fs = FileSystem.get(conf); + FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation())); + //Assert.assertEquals(4, stat.length); + + // Find the new delta file and make sure it has the right contents + boolean sawNewBase = false; + for (int i = 0; i < stat.length; i++) { + if (stat[i].getPath().getName().equals("base_0000024")) { + sawNewBase = true; + FileStatus[] buckets = fs.listStatus(stat[i].getPath()); + Assert.assertEquals(2, buckets.length); + Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); + Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); + Assert.assertEquals(1248L, buckets[0].getLen()); + Assert.assertEquals(1248L, buckets[1].getLen()); + } else { + LOG.debug("This is not the file you are looking for " + stat[i].getPath().getName()); + } + } + Assert.assertTrue(sawNewBase); + } + + @Test + public void minorTableLegacy() throws Exception { + LOG.debug("Starting minorTableLegacy"); + Table t = newTable("default", "mtl", false); + + HiveConf conf = new HiveConf(); + + addLegacyFile(conf, t, null, 20); + addDeltaFile(conf, t, null, 21L, 22L, 2); + addDeltaFile(conf, t, null, 23L, 24L, 2); + + burnThroughTransactions(25); + + CompactionRequest rqst = new CompactionRequest("default", "mtl", CompactionType.MINOR); + txnHandler.compact(rqst); + + startWorker(new HiveConf()); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List compacts = rsp.getCompacts(); + Assert.assertEquals(1, compacts.size()); + Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); + + // There should still now be 5 directories in the location + FileSystem fs = FileSystem.get(conf); + FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation())); + + // Find the new delta file and make sure it has the right contents + boolean sawNewDelta = false; + for (int i = 0; i < stat.length; i++) { + if (stat[i].getPath().getName().equals("delta_0000021_0000024")) { + sawNewDelta = true; + FileStatus[] buckets = fs.listStatus(stat[i].getPath()); + Assert.assertEquals(2, buckets.length); + Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); + Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); + } else { + LOG.debug("This is not the file you are looking for " + stat[i].getPath().getName()); + } + } + Assert.assertTrue(sawNewDelta); + } + + @Test + public void majorPartitionWithBaseMissingBuckets() throws Exception { + Table t = newTable("default", "mapwbmb", true); + Partition p = newPartition(t, "today"); + HiveConf conf = new HiveConf(); + + addBaseFile(conf, t, p, 20L, 20, 2, false); + addDeltaFile(conf, t, p, 21L, 22L, 2, 2, false); + addDeltaFile(conf, t, p, 23L, 24L, 2); + + burnThroughTransactions(25); + + CompactionRequest rqst = new CompactionRequest("default", "mapwbmb", CompactionType.MAJOR); + rqst.setPartitionname("ds=today"); + txnHandler.compact(rqst); + + startWorker(new HiveConf()); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List compacts = rsp.getCompacts(); + Assert.assertEquals(1, compacts.size()); + Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); + + // There should still be four directories in the location. + FileSystem fs = FileSystem.get(conf); + FileStatus[] stat = fs.listStatus(new Path(p.getSd().getLocation())); + Assert.assertEquals(4, stat.length); + + // Find the new delta file and make sure it has the right contents + boolean sawNewBase = false; + for (int i = 0; i < stat.length; i++) { + if (stat[i].getPath().getName().equals("base_0000024")) { + sawNewBase = true; + FileStatus[] buckets = fs.listStatus(stat[i].getPath()); + Assert.assertEquals(2, buckets.length); + Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); + Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); + // Bucket 0 should be small and bucket 1 should be large, make sure that's the case + Assert.assertTrue( + ("bucket_00000".equals(buckets[0].getPath().getName()) && 104L == buckets[0].getLen() + && "bucket_00001".equals(buckets[1].getPath().getName()) && 1248L == buckets[1] .getLen()) + || + ("bucket_00000".equals(buckets[1].getPath().getName()) && 104L == buckets[1].getLen() + && "bucket_00001".equals(buckets[0].getPath().getName()) && 1248L == buckets[0] .getLen()) + ); + } else { + LOG.debug("This is not the file you are looking for " + stat[i].getPath().getName()); + } + } + Assert.assertTrue(sawNewBase); + } + + @Before + public void setUpTxnDb() throws Exception { + TxnDbUtil.setConfValues(new HiveConf()); + } +}