diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 8bff2a9..2ff3297 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -33,6 +33,7 @@ import java.util.Map.Entry; import java.util.Properties; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -385,6 +386,9 @@ METADATA_EXPORT_LOCATION("hive.metadata.export.location", ""), MOVE_EXPORTED_METADATA_TO_TRASH("hive.metadata.move.exported.metadata.to.trash", true), + METASTORE_RETENTION_INTERVAL("hive.metastore.retention.interval", "0s", new TimeValidator()), + METASTORE_RETENTION_DATABASES("hive.metastore.retention.databases", ""), + // CLI CLIIGNOREERRORS("hive.cli.errors.ignore", false), CLIPRINTCURRENTDB("hive.cli.print.current.db", false), @@ -1255,6 +1259,53 @@ public void setIntVar(ConfVars var, int val) { setIntVar(this, var, val); } + public static long getTimeVar(Configuration conf, ConfVars var, TimeUnit outUnit) { + assert (var.valClass == String.class) : var.varname; + return toTime(getVar(conf, var), outUnit); + } + + public long getTimeVar(ConfVars var, TimeUnit outUnit) { + String value = getVar(this, var); + return toTime(value, outUnit); + } + + public static long toTime(String value, TimeUnit outUnit) { + String[] parsed = parseTime(value.trim()); + return toTime(parsed[0].trim(), parsed[1].trim(), outUnit); + } + + private static String[] parseTime(String value) { + char[] chars = value.toCharArray(); + int i = 0; + for (; i < chars.length && (chars[i] == '-' || Character.isDigit(chars[i])); i++) { + } + return new String[] {value.substring(0, i), value.substring(i)}; + } + + public static long toTime(String timePart, String unitPart, TimeUnit outUnit) { + return outUnit.convert(Long.valueOf(timePart), unitFor(unitPart)); + } + + public static TimeUnit unitFor(String unit) { + unit = unit.toLowerCase(); + if (unit.equals("d") || unit.startsWith("day")) { + return TimeUnit.DAYS; + } else if (unit.equals("h") || unit.startsWith("hour")) { + return TimeUnit.HOURS; + } else if (unit.equals("m") || unit.startsWith("min")) { + return TimeUnit.MINUTES; + } else if (unit.equals("s") || unit.startsWith("sec")) { + return TimeUnit.SECONDS; + } else if (unit.equals("ms") || unit.startsWith("msec")) { + return TimeUnit.MILLISECONDS; + } else if (unit.equals("us") || unit.startsWith("usec")) { + return TimeUnit.MICROSECONDS; + } else if (unit.equals("ns") || unit.startsWith("nsec")) { + return TimeUnit.NANOSECONDS; + } + throw new IllegalArgumentException("Invalid time unit " + unit); + } + public static long getLongVar(Configuration conf, ConfVars var) { assert (var.valClass == Long.class) : var.varname; return conf.getLong(var.varname, var.defaultLongVal); @@ -1667,6 +1718,38 @@ public String validate(String value) { } } + public static class TimeValidator implements Validator { + + private TimeUnit timeUnit = TimeUnit.SECONDS; + private Long min; + private Long max; + + public TimeValidator() { + } + + public TimeValidator(Long min, Long max, TimeUnit timeUnit) { + this.timeUnit = timeUnit; + this.min = min; + this.max = max; + } + + @Override + public String validate(String value) { + try { + long time = toTime(value, timeUnit); + if (min != null && time < min) { + return value + " is smaller than " + min + " " + timeUnit.name().toLowerCase(); + } + if (max != null && time > max) { + return value + " is bigger than " + max + " " + timeUnit.name().toLowerCase(); + } + } catch (Exception e) { + return e.toString(); + } + return null; + } + } + /** * Append comma separated list of config vars to the restrict List * @param restrictListStr diff --git conf/hive-default.xml.template conf/hive-default.xml.template index 4944dfc..32c0738 100644 --- conf/hive-default.xml.template +++ conf/hive-default.xml.template @@ -284,6 +284,18 @@ + hive.metastore.retention.interval + 0s + Check interval for retention. Non-positive value disables it. + + + + hive.metastore.retention.databases + + Comma separated list of database names which will apply retention for tables/partitions on it. Empty string means all databases. + + + hive.metastore.partition.name.whitelist.pattern Partition names will be checked against this regex pattern and rejected if not matched. diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java index 130fd67..d7efd80 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java @@ -130,7 +130,7 @@ public void testNameMethods() { /** * tests create table and partition and tries to drop the table without - * droppping the partition + * dropping the partition * * @throws Exception */ @@ -270,7 +270,7 @@ public static void partitionTester(HiveMetaStoreClient client, HiveConf hiveConf adjust(client, part2, dbName, tblName); adjust(client, part3, dbName, tblName); } - assertTrue("Partitions are not same", part.equals(part_get)); + assertTrue("Partitions are not same, expected : " + part + ", real : " + part_get, part.equals(part_get)); String partName = "ds=" + FileUtils.escapePathName("2008-07-01 14:13:12") + "/hr=14"; String part2Name = "ds=" + FileUtils.escapePathName("2008-07-01 14:13:12") + "/hr=15"; diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestRetention.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestRetention.java new file mode 100644 index 0000000..f2a7629 --- /dev/null +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestRetention.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore; + +import junit.framework.Assert; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.InvalidTableException; +import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +public class TestRetention { + + private static HiveConf hiveConf; + private static Hive db; + + @BeforeClass + public static void start() throws Exception { + + System.setProperty("hive.metastore.retention.interval", "1s"); + int port = MetaStoreUtils.findFreePort(); + MetaStoreUtils.startMetaStore(port, ShimLoader.getHadoopThriftAuthBridge()); + hiveConf = new HiveConf(TestRetention.class); + hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:" + port); + hiveConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3); + hiveConf.setIntVar(HiveConf.ConfVars.HMSHANDLERATTEMPTS, 2); + hiveConf.setIntVar(HiveConf.ConfVars.HMSHANDLERINTERVAL, 0); + hiveConf.setBoolVar(HiveConf.ConfVars.HMSHANDLERFORCERELOADCONF, false); + + db = Hive.get(hiveConf); + } + + @AfterClass + public static void stop() { + Hive.closeCurrent(); + } + + @Test + public void testTableRetention() throws Throwable { + + String tableName = "default.test_table"; + + db.dropTable(tableName); + Table tbl = db.newTable(tableName); + List cols = new ArrayList(); + cols.add(new FieldSchema("col1", "string", "comment1")); + cols.add(new FieldSchema("col2", "string", "comment2")); + tbl.setFields(cols); + + tbl.setRetention(10); // seconds + + db.createTable(tbl); + + try { + for (int i = 0; i < 12; i++) { + Thread.sleep(1000); + try { + db.getTable(tableName); + } catch (InvalidTableException e) { + Assert.assertTrue("time index " + i, i >= 10); + return; + } + } + throw new Exception("Retention failed"); + } finally { + db.dropTable(tableName); + } + } + + @Test + public void testPartitionRetention() throws Throwable { + + String tableName = "default.test_table"; + + db.dropTable(tableName); + Table tbl = db.newTable(tableName); + List cols = new ArrayList(); + cols.add(new FieldSchema("col1", "string", "comment1")); + cols.add(new FieldSchema("col2", "string", "comment2")); + tbl.setFields(cols); + + List partCols = new ArrayList(); + partCols.add(new FieldSchema("pcol", "string", "comment3")); + tbl.setPartCols(partCols); + + tbl.setRetention(10); // seconds + + db.createTable(tbl); + + try { + Map partSpec = new LinkedHashMap(); + partSpec.put("pcol", "v1"); + db.createPartition(tbl, partSpec); + + for (int i = 0; i < 12; i++) { + Thread.sleep(1000); + Partition partition = db.getPartition(tbl, partSpec, false); + if (partition == null) { + Assert.assertTrue("time index " + i, i >= 10); + return; + } + } + throw new Exception("Retention failed"); + } finally { + db.dropTable(tableName); + } + } +} diff --git metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index acef599..c0211b0 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -42,6 +42,8 @@ import java.util.Properties; import java.util.Set; import java.util.Timer; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -163,6 +165,7 @@ import org.apache.hadoop.hive.metastore.model.MRoleMap; import org.apache.hadoop.hive.metastore.model.MTableColumnPrivilege; import org.apache.hadoop.hive.metastore.model.MTablePrivilege; +import org.apache.hadoop.hive.metastore.retention.RetentionProcessor; import org.apache.hadoop.hive.metastore.txn.TxnHandler; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.SerDeException; @@ -5075,12 +5078,7 @@ public void run() { } }); - Lock startLock = new ReentrantLock(); - Condition startCondition = startLock.newCondition(); - MetaStoreThread.BooleanPointer startedServing = new MetaStoreThread.BooleanPointer(); - startMetaStoreThreads(conf, startLock, startCondition, startedServing); - startMetaStore(cli.port, ShimLoader.getHadoopThriftAuthBridge(), conf, startLock, - startCondition, startedServing); + startMetaStore(cli.port, ShimLoader.getHadoopThriftAuthBridge(), conf); } catch (Throwable t) { // Catch the exception, log it and rethrow it. HMSHandler.LOG @@ -5098,19 +5096,7 @@ public void run() { */ public static void startMetaStore(int port, HadoopThriftAuthBridge bridge) throws Throwable { - startMetaStore(port, bridge, new HiveConf(HMSHandler.class), null, null, null); - } - - /** - * Start the metastore store. - * @param port - * @param bridge - * @param conf - * @throws Throwable - */ - public static void startMetaStore(int port, HadoopThriftAuthBridge bridge, - HiveConf conf) throws Throwable { - startMetaStore(port, bridge, conf, null, null, null); + startMetaStore(port, bridge, new HiveConf(HMSHandler.class)); } /** @@ -5123,12 +5109,17 @@ public static void startMetaStore(int port, HadoopThriftAuthBridge bridge, * @throws Throwable */ public static void startMetaStore(int port, HadoopThriftAuthBridge bridge, - HiveConf conf, Lock startLock, Condition startCondition, - MetaStoreThread.BooleanPointer startedServing) throws Throwable { + HiveConf conf) throws Throwable { + + Lock startLock = new ReentrantLock(); + Condition startCondition = startLock.newCondition(); + AtomicBoolean startedServing = new AtomicBoolean(); + startMetaStoreThreads(conf, startLock, startCondition, startedServing); + try { isMetaStoreRemote = true; // Server will create new threads up to max as necessary. After an idle - // period, it will destory threads to keep the number of threads in the + // period, it will destroy threads to keep the number of threads in the // pool to min. int minWorkerThreads = conf.getIntVar(HiveConf.ConfVars.METASTORESERVERMINTHREADS); int maxWorkerThreads = conf.getIntVar(HiveConf.ConfVars.METASTORESERVERMAXTHREADS); @@ -5193,9 +5184,7 @@ public static void startMetaStore(int port, HadoopThriftAuthBridge bridge, + maxWorkerThreads); HMSHandler.LOG.info("TCP keepalive = " + tcpKeepAlive); - if (startLock != null) { - signalOtherThreadsToStart(tServer, startLock, startCondition, startedServing); - } + signalOtherThreadsToStart(tServer, startLock, startCondition, startedServing); tServer.serve(); } catch (Throwable x) { x.printStackTrace(); @@ -5206,7 +5195,7 @@ public static void startMetaStore(int port, HadoopThriftAuthBridge bridge, private static void signalOtherThreadsToStart(final TServer server, final Lock startLock, final Condition startCondition, - final MetaStoreThread.BooleanPointer startedServing) { + final AtomicBoolean startedServing) { // A simple thread to wait until the server has started and then signal the other threads to // begin Thread t = new Thread() { @@ -5221,7 +5210,7 @@ public void run() { } while (!server.isServing()); startLock.lock(); try { - startedServing.boolVal = true; + startedServing.set(true); startCondition.signalAll(); } finally { startLock.unlock(); @@ -5237,7 +5226,7 @@ public void run() { */ private static void startMetaStoreThreads(final HiveConf conf, final Lock startLock, final Condition startCondition, final - MetaStoreThread.BooleanPointer startedServing) { + AtomicBoolean startedServing) { // A thread is spun up to start these other threads. That's because we can't start them // until after the TServer has started, but once TServer.serve is called we aren't given back // control. @@ -5247,7 +5236,7 @@ public void run() { // This is a massive hack. The compactor threads have to access packages in ql (such as // AcidInputFormat). ql depends on metastore so we can't directly access those. To deal // with this the compactor thread classes have been put in ql and they are instantiated here - // dyanmically. This is not ideal but it avoids a massive refactoring of Hive packages. + // dynamically. This is not ideal but it avoids a massive refactoring of Hive packages. // // Wrap the start of the threads in a catch Throwable loop so that any failures // don't doom the rest of the metastore. @@ -5255,10 +5244,11 @@ public void run() { try { // Per the javadocs on Condition, do not depend on the condition alone as a start gate // since spurious wake ups are possible. - while (!startedServing.boolVal) startCondition.await(); + while (!startedServing.get()) startCondition.await(); startCompactorInitiator(conf); startCompactorWorkers(conf); startCompactorCleaner(conf); + startRetentionProcessor(conf); } catch (Throwable e) { LOG.error("Failure when starting the compactor, compactions may not happen, " + StringUtils.stringifyException(e)); @@ -5308,6 +5298,13 @@ private static MetaStoreThread instantiateThread(String classname) throws Except } } + private static void startRetentionProcessor(HiveConf conf) throws Exception { + if (HiveConf.getTimeVar(conf, + HiveConf.ConfVars.METASTORE_RETENTION_INTERVAL, TimeUnit.SECONDS) > 0) { + initializeAndStartThread(new RetentionProcessor(), conf); + } + } + private static int nextThreadId = 1000000; private static void initializeAndStartThread(MetaStoreThread thread, HiveConf conf) throws @@ -5315,7 +5312,7 @@ private static void initializeAndStartThread(MetaStoreThread thread, HiveConf co LOG.info("Starting metastore thread of type " + thread.getClass().getName()); thread.setHiveConf(conf); thread.setThreadId(nextThreadId++); - thread.init(new MetaStoreThread.BooleanPointer()); + thread.init(new AtomicBoolean(false)); thread.start(); } } diff --git metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java index 6e18a5b..42f64b2 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java @@ -20,6 +20,8 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.MetaException; +import java.util.concurrent.atomic.AtomicBoolean; + /** * A thread that runs in the metastore, separate from the threads in the thrift service. */ @@ -44,21 +46,12 @@ * @param stop a flag to watch for when to stop. If this value is set to true, * the thread will terminate the next time through its main loop. */ - void init(BooleanPointer stop) throws MetaException; + void init(AtomicBoolean stop) throws MetaException; /** * Run the thread in the background. This must not be called until - * {@link #init(org.apache.hadoop.hive.metastore.MetaStoreThread.BooleanPointer)} has + * {@link #init(java.util.concurrent.atomic.AtomicBoolean)} has * been called. */ void start(); - - class BooleanPointer { - public boolean boolVal; - - public BooleanPointer() { - boolVal = false; - } - } - } diff --git metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java index 911c997..81207b2 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -128,6 +128,7 @@ import org.apache.hadoop.hive.metastore.parser.ExpressionTree.Operator; import org.apache.hadoop.hive.metastore.parser.FilterLexer; import org.apache.hadoop.hive.metastore.parser.FilterParser; +import org.apache.hadoop.hive.metastore.retention.RetentionTarget; import org.apache.hadoop.util.StringUtils; import org.apache.thrift.TException; import org.datanucleus.store.rdbms.exceptions.MissingTableException; @@ -909,6 +910,69 @@ private MTable getMTable(String db, String table) { return mtbl; } + private int nowInSeconds() { + return (int) (System.currentTimeMillis() / 1000); + } + + public List getRetentionTargets(String[] databases) throws MetaException { + boolean committed = false; + int current = nowInSeconds(); + List targets = new ArrayList(); + try { + openTransaction(); + for (Object t : getTableRetentions(databases, current)) { + targets.add(new RetentionTarget((MTable)t)); + } + for (Object p : getPartitionRetentions(databases, current)) { + targets.add(new RetentionTarget((MPartition)p)); + } + committed = commitTransaction(); + } finally { + if (!committed) { + rollbackTransaction(); + } + } + return targets; + } + + private Collection getTableRetentions(String[] databases, int current) { + Query query = pm.newQuery(MTable.class); + StringBuilder builder = new StringBuilder(); + builder.append("partitionKeys == null && "); + builder.append("retention > 0 && createTime + retention < " + current); + if (databases != null && databases.length > 0) { + builder.append(" && database.name in ("); + for (int i = 0; i < databases.length; i++) { + if (i > 0) { + builder.append(", "); + } + builder.append("\"" + databases[i].trim() + "\""); + } + builder.append(")"); + } + query.setFilter(builder.toString()); + return (Collection) query.execute(); + } + + private Collection getPartitionRetentions(String[] databases, int current) { + Query query = pm.newQuery(MPartition.class); + StringBuilder builder = new StringBuilder(); + builder.append("table.partitionKeys != null && "); + builder.append("table.retention > 0 && createTime + table.retention < " + current); + if (databases != null && databases.length > 0) { + builder.append(" && table.database.name in ("); + for (int i = 0; i < databases.length; i++) { + if (i > 0) { + builder.append(", "); + } + builder.append("\"" + databases[i].trim() + "\""); + } + builder.append(")"); + } + query.setFilter(builder.toString()); + return (Collection) query.execute(); + } + @Override public List getTableObjectsByName(String db, List tbl_names) throws MetaException, UnknownDBException { diff --git metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java index e0de0e0..038471d 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java @@ -54,6 +54,7 @@ import org.apache.hadoop.hive.metastore.model.MRoleMap; import org.apache.hadoop.hive.metastore.model.MTableColumnPrivilege; import org.apache.hadoop.hive.metastore.model.MTablePrivilege; +import org.apache.hadoop.hive.metastore.retention.RetentionTarget; import org.apache.thrift.TException; public interface RawStore extends Configurable { @@ -547,4 +548,11 @@ public void dropFunction(String dbName, String funcName) */ public List getFunctions(String dbName, String pattern) throws MetaException; + /** + * Retrieve retention table/partition + * @param databases + * @return + * @throws MetaException + */ + public List getRetentionTargets(String[] databases) throws MetaException; } diff --git metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java index 9e8d912..2bc2efc 100755 --- metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.model.MFieldSchema; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ReflectionUtils; @@ -485,6 +486,16 @@ public static String makePartName(List partCols, return makePartName(partCols, vals, null); } + public static String fieldsToPartName(List partCols, + List vals) throws MetaException { + return fieldsToPartName(partCols, vals, null); + } + + public static String stringToPartName(List partCols, + List vals) throws MetaException { + return stringToPartName(partCols, vals, null); + } + /** * @param desc * @return array of FileStatus objects corresponding to the files @@ -528,12 +539,27 @@ public static String makePartName(List partCols, * @return An escaped, valid partition name. * @throws MetaException */ - public static String makePartName(List partCols, + public static String makePartName(List partSchema, List vals, String defaultStr) throws MetaException { + return toPartName(partSchema, vals, defaultStr, FIELD); + } + + public static String fieldsToPartName(List partSchema, + List vals, String defaultStr) throws MetaException { + return toPartName(partSchema, vals, defaultStr, MFIELD); + } + + public static String stringToPartName(List partSchema, + List vals, String defaultStr) throws MetaException { + return toPartName(partSchema, vals, defaultStr, STRING); + } + + private static String toPartName(List partCols, + List vals, String defaultStr, NameAccess access) throws MetaException { if ((partCols.size() != vals.size()) || (partCols.size() == 0)) { String errorStr = "Invalid partition key & values; keys ["; - for (FieldSchema fs : partCols) { - errorStr += (fs.getName() + ", "); + for (T col : partCols) { + errorStr += access.getName(col) + ", "; } errorStr += "], values ["; for (String val : vals) { @@ -542,8 +568,8 @@ public static String makePartName(List partCols, throw new MetaException(errorStr + "]"); } List colNames = new ArrayList(); - for (FieldSchema col: partCols) { - colNames.add(col.getName()); + for (T col : partCols) { + colNames.add(access.getName(col)); } return FileUtils.makePartName(colNames, vals, defaultStr); } @@ -556,4 +582,17 @@ public static String makePartName(List partCols, return values; } + private static interface NameAccess { + String getName(T column); + } + + private static final NameAccess STRING = new NameAccess() { + public String getName(String column) { return column; } + }; + private static final NameAccess FIELD = new NameAccess() { + public String getName(FieldSchema column) { return column.getName(); } + }; + private static final NameAccess MFIELD = new NameAccess() { + public String getName(MFieldSchema column) { return column.getName(); } + }; } diff --git metastore/src/java/org/apache/hadoop/hive/metastore/retention/RetentionProcessor.java metastore/src/java/org/apache/hadoop/hive/metastore/retention/RetentionProcessor.java new file mode 100644 index 0000000..1d93486 --- /dev/null +++ metastore/src/java/org/apache/hadoop/hive/metastore/retention/RetentionProcessor.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.retention; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.MetaStoreThread; +import org.apache.hadoop.hive.metastore.RawStore; +import org.apache.hadoop.hive.metastore.RawStoreProxy; +import org.apache.hadoop.hive.metastore.api.MetaException; + +import java.util.Date; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTORE_RETENTION_DATABASES; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTORE_RETENTION_INTERVAL; + +public class RetentionProcessor extends Thread implements MetaStoreThread { + + private static final Log LOG = LogFactory.getLog(RetentionProcessor.class); + + private HiveConf conf; + private int threadId; + private AtomicBoolean stop; + private RawStore rs; + + private long interval; + private String[] databases; + + @Override + public void setHiveConf(HiveConf conf) { + this.conf = conf; + } + + @Override + public void setThreadId(int threadId) { + this.threadId = threadId; + } + + @Override + public void init(AtomicBoolean stop) throws MetaException { + this.stop = stop; + this.rs = RawStoreProxy.getProxy(conf, conf, + conf.getVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL), threadId); + interval = HiveConf.getTimeVar(conf, METASTORE_RETENTION_INTERVAL, TimeUnit.MILLISECONDS); + String databaseNames = HiveConf.getVar(conf, METASTORE_RETENTION_DATABASES).trim(); + databases = databaseNames.isEmpty() ? null : databaseNames.split(","); + setName("Retention [" + TimeUnit.SECONDS.convert(interval, TimeUnit.MILLISECONDS) + "]"); + setPriority(MIN_PRIORITY); + setDaemon(true); + } + + @Override + public void run() { + while (!stop.get()) { + try { + Thread.sleep(interval); + } catch (InterruptedException e) { + // ignore + } + try { + checkTTL(databases); + } catch (MetaException e) { + LOG.warn("Failed to access metastore", e); + } + } + } + + private void checkTTL(String[] databases) throws MetaException { + for (RetentionTarget target: rs.getRetentionTargets(databases)) { + String name = target.getName(); + String retention = target.retention + " seconds"; + if (target.retention > 60 ) { + if (target.retention > 60 * 60) { + if (target.retention > 60 * 60 * 24) { + retention += "(about " + target.retention / 60 / 60 / 24 + "+ days)"; + } else { + retention += "(about " + target.retention / 60 / 60 + "+ hours)"; + } + } else { + retention += "(about " + target.retention / 60 + "+ minutes)"; + } + } + LOG.warn("Dropping " + name + " by retention policy (Created: " + + new Date(target.createTime * 1000l) + ", Retention on: " + retention); + try { + if (target.partValues == null) { + rs.dropTable(target.databaseName, target.tableName); + } else { + rs.dropPartition(target.databaseName, target.tableName, target.partValues); + } + } catch (Exception e) { + LOG.warn("Failed to drop " + name + " (retention)", e); + } + } + } +} \ No newline at end of file diff --git metastore/src/java/org/apache/hadoop/hive/metastore/retention/RetentionTarget.java metastore/src/java/org/apache/hadoop/hive/metastore/retention/RetentionTarget.java new file mode 100644 index 0000000..fcf85ed --- /dev/null +++ metastore/src/java/org/apache/hadoop/hive/metastore/retention/RetentionTarget.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.retention; + +import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.model.MPartition; +import org.apache.hadoop.hive.metastore.model.MTable; + +import java.util.List; + +public class RetentionTarget { + + String databaseName; + String tableName; + List partValues; + String partNames; + int createTime; + int retention; + + public RetentionTarget(MTable table) { + this.databaseName = table.getDatabase().getName(); + this.tableName = table.getTableName(); + this.createTime = table.getCreateTime(); + this.retention = table.getRetention(); + } + + public RetentionTarget(MPartition partition) throws MetaException { + this.databaseName = partition.getTable().getDatabase().getName(); + this.tableName = partition.getTable().getTableName(); + this.partValues = partition.getValues(); + this.partNames = Warehouse.fieldsToPartName(partition.getTable().getPartitionKeys(), partValues); + this.createTime = partition.getCreateTime(); + this.retention = partition.getTable().getRetention(); + } + + public String getName() { + if (partValues == null) { + return "table " + databaseName + "." + tableName; + } + return "partition " + databaseName + "." + tableName + "." + partNames; + } + + public String toString() { + return getName() + "[" + createTime + ":" + retention + "]"; + } +} diff --git metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java index 5c00aa1..00709aa 100644 --- metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java +++ metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.metastore; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -52,6 +53,7 @@ import org.apache.hadoop.hive.metastore.model.MRoleMap; import org.apache.hadoop.hive.metastore.model.MTableColumnPrivilege; import org.apache.hadoop.hive.metastore.model.MTablePrivilege; +import org.apache.hadoop.hive.metastore.retention.RetentionTarget; import org.apache.thrift.TException; /** @@ -709,5 +711,10 @@ public Function getFunction(String dbName, String funcName) return objectStore.getFunctions(dbName, pattern); } + @Override + public List getRetentionTargets(String[] databases) throws MetaException { + return Collections.emptyList(); + } + } diff --git metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java index 5025b83..2c02e0a 100644 --- metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java +++ metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.metastore; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -53,6 +54,7 @@ import org.apache.hadoop.hive.metastore.model.MRoleMap; import org.apache.hadoop.hive.metastore.model.MTableColumnPrivilege; import org.apache.hadoop.hive.metastore.model.MTablePrivilege; +import org.apache.hadoop.hive.metastore.retention.RetentionTarget; import org.apache.thrift.TException; /** @@ -726,6 +728,11 @@ public Function getFunction(String dbName, String funcName) return null; } + @Override + public List getRetentionTargets(String[] databases) throws MetaException { + return Collections.emptyList(); + } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index 4d8e10c..56cbd48 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -3722,6 +3722,10 @@ private int alterTable(Hive db, AlterTableDesc alterTbl) throws HiveException { while (keyItr.hasNext()) { tbl.getTTable().getParameters().remove(keyItr.next()); } + } else if (alterTbl.getOp() == AlterTableTypes.SETRETENTION) { + tbl.getTTable().setRetention(alterTbl.getRetentionSeconds()); + } else if (alterTbl.getOp() == AlterTableTypes.UNSETRETENTION) { + tbl.getTTable().setRetention(0); } else if (alterTbl.getOp() == AlterTableDesc.AlterTableTypes.ADDSERDEPROPS) { if (part != null) { part.getTPartition().getSd().getSerdeInfo().getParameters().putAll( @@ -4036,9 +4040,12 @@ private void dropTable(Hive db, Table tbl, DropTableDesc dropTbl) throws HiveExc * HiveConf of session */ private boolean updateModifiedParameters(Map params, HiveConf conf) throws HiveException { - String user = null; - user = SessionState.getUserFromAuthenticator(); - params.put("last_modified_by", user); + String user = SessionState.getUserFromAuthenticator(); + if (user != null) { + params.put("last_modified_by", user); + } else { + params.remove("last_modified_by"); + } params.put("last_modified_time", Long.toString(System.currentTimeMillis() / 1000)); return true; } diff --git ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java index 26836b6..05f7823 100644 --- ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java +++ ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java @@ -167,6 +167,7 @@ public static WriteType determineAlterTableWriteType(AlterTableDesc.AlterTableTy case ADDFILEFORMAT: case ADDSERDE: case DROPPROPS: + case UNSETRETENTION: case REPLACECOLS: case ARCHIVE: case UNARCHIVE: @@ -184,6 +185,7 @@ public static WriteType determineAlterTableWriteType(AlterTableDesc.AlterTableTy case ADDPARTITION: case ADDSERDEPROPS: + case SETRETENTION: case ADDPROPS: return WriteType.DDL_SHARED; case COMPACT: diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java index 640b6b3..22e1223 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java @@ -36,6 +36,7 @@ import java.util.Map.Entry; import java.util.Properties; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.antlr.runtime.tree.CommonTree; import org.antlr.runtime.tree.Tree; @@ -57,7 +58,6 @@ import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.ArchiveUtils; -import org.apache.hadoop.hive.ql.exec.DDLTask; import org.apache.hadoop.hive.ql.exec.FetchTask; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.exec.Task; @@ -406,6 +406,12 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { case HiveParser.TOK_DROPTABLE_PROPERTIES: analyzeAlterTableProps(ast, false, true); break; + case HiveParser.TOK_ALTERTABLE_RETENTION: + analyzeAlterTableRetention(ast, false); + break; + case HiveParser.TOK_DROPTABLE_RETENTION: + analyzeAlterTableRetention(ast, true); + break; case HiveParser.TOK_ALTERINDEX_REBUILD: analyzeAlterIndexRebuild(ast); break; @@ -1277,6 +1283,30 @@ private void analyzeAlterTableProps(ASTNode ast, boolean expectView, boolean isU alterTblDesc), conf)); } + private void analyzeAlterTableRetention(ASTNode ast, boolean isUnset) throws SemanticException { + + String tableName = getUnescapedName((ASTNode) ast.getChild(0)); + AlterTableDesc alterTblDesc = null; + if (isUnset) { + alterTblDesc = new AlterTableDesc(AlterTableTypes.UNSETRETENTION); + } else { + String time = ast.getChild(1).getText(); + String unit = ast.getChild(2).getText(); + long seconds = HiveConf.toTime(time, unit, TimeUnit.SECONDS); + if (seconds > Integer.MAX_VALUE) { + throw new SemanticException("Too big: " + seconds + " seconds"); + } + alterTblDesc = new AlterTableDesc(AlterTableTypes.SETRETENTION); + alterTblDesc.setRetentionSeconds((int)seconds); + } + alterTblDesc.setOldName(tableName); + + addInputsOutputsAlterTable(tableName, null, alterTblDesc); + + rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), + alterTblDesc), conf)); + } + private void analyzeAlterTableSerdeProps(ASTNode ast, String tableName, HashMap partSpec) throws SemanticException { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g index 412a046..282e9de 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g +++ ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g @@ -195,6 +195,7 @@ KW_DBPROPERTIES: 'DBPROPERTIES'; KW_LIMIT: 'LIMIT'; KW_SET: 'SET'; KW_UNSET: 'UNSET'; +KW_RETENTION: 'RETENTION'; KW_TBLPROPERTIES: 'TBLPROPERTIES'; KW_IDXPROPERTIES: 'IDXPROPERTIES'; KW_VALUE_TYPE: '$VALUE$'; diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g index f934ac4..4651190 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g +++ ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g @@ -127,6 +127,7 @@ TOK_CREATEINDEX_INDEXTBLNAME; TOK_DEFERRED_REBUILDINDEX; TOK_DROPINDEX; TOK_DROPTABLE_PROPERTIES; +TOK_DROPTABLE_RETENTION; TOK_LIKETABLE; TOK_DESCTABLE; TOK_DESCFUNCTION; @@ -150,6 +151,7 @@ TOK_TABLE_PARTITION; TOK_ALTERTABLE_FILEFORMAT; TOK_ALTERTABLE_LOCATION; TOK_ALTERTABLE_PROPERTIES; +TOK_ALTERTABLE_RETENTION; TOK_ALTERTABLE_CHANGECOL_AFTER_POSITION; TOK_ALTERINDEX_REBUILD; TOK_ALTERINDEX_PROPERTIES; @@ -385,6 +387,7 @@ import java.util.HashMap; xlateMap.put("KW_PARTITIONS", "PARTITIONS"); xlateMap.put("KW_TABLE", "TABLE"); xlateMap.put("KW_TABLES", "TABLES"); + xlateMap.put("KW_RETENTION", "RETENTION"); xlateMap.put("KW_TBLPROPERTIES", "TBLPROPERTIES"); xlateMap.put("KW_SHOW", "SHOW"); xlateMap.put("KW_MSCK", "MSCK"); @@ -946,6 +949,7 @@ alterTableStatementSuffix | alterStatementSuffixArchive | alterStatementSuffixUnArchive | alterStatementSuffixProperties + | alterStatementSuffixRetention | alterTblPartitionStatement | alterStatementSuffixSkewedby | alterStatementSuffixExchangePartition @@ -1093,6 +1097,15 @@ alterStatementSuffixProperties -> ^(TOK_DROPTABLE_PROPERTIES $name tableProperties ifExists?) ; +alterStatementSuffixRetention +@init { pushMsg("alter retention statement", state); } +@after { popMsg(state); } + : identifier KW_SET KW_RETENTION Number Identifier + -> ^(TOK_ALTERTABLE_RETENTION identifier Number Identifier) + | identifier KW_UNSET KW_RETENTION + -> ^(TOK_DROPTABLE_RETENTION identifier) + ; + alterViewSuffixProperties @init { pushMsg("alter view properties statement", state); } @after { popMsg(state); } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java index b6f3748..9b43d61 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java @@ -180,6 +180,8 @@ public static BaseSemanticAnalyzer get(HiveConf conf, ASTNode tree) case HiveParser.TOK_ALTERTABLE_ADDPARTS: case HiveParser.TOK_ALTERTABLE_PROPERTIES: case HiveParser.TOK_DROPTABLE_PROPERTIES: + case HiveParser.TOK_ALTERTABLE_RETENTION: + case HiveParser.TOK_DROPTABLE_RETENTION: case HiveParser.TOK_ALTERTABLE_SERIALIZER: case HiveParser.TOK_ALTERTABLE_SERDEPROPERTIES: case HiveParser.TOK_ALTERTABLE_PARTCOLTYPE: diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableDesc.java index 20d863b..b097ad0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableDesc.java @@ -48,7 +48,8 @@ ADDFILEFORMAT, ADDCLUSTERSORTCOLUMN, RENAMECOLUMN, ADDPARTITION, TOUCH, ARCHIVE, UNARCHIVE, ALTERPROTECTMODE, ALTERPARTITIONPROTECTMODE, ALTERLOCATION, DROPPARTITION, RENAMEPARTITION, ADDSKEWEDBY, ALTERSKEWEDLOCATION, - ALTERBUCKETNUM, ALTERPARTITION, COMPACT + ALTERBUCKETNUM, ALTERPARTITION, COMPACT, + SETRETENTION, UNSETRETENTION } public static enum ProtectModeType { @@ -89,6 +90,8 @@ boolean isDropIfExists = false; boolean isTurnOffSorting = false; + int retentionSeconds; + public AlterTableDesc() { } @@ -702,5 +705,13 @@ public boolean getIsDropIfExists() { return isDropIfExists; } + public int getRetentionSeconds() { + return retentionSeconds; + } + + public void setRetentionSeconds(int retentionSeconds) { + this.retentionSeconds = retentionSeconds; + } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java index 18bb2c0..042153d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java @@ -85,13 +85,13 @@ public void run() { // Now, go back to bed until it's time to do this again long elapsedTime = System.currentTimeMillis() - startedAt; - if (elapsedTime >= cleanerCheckInterval || stop.boolVal) continue; + if (elapsedTime >= cleanerCheckInterval || stop.get()) continue; else Thread.sleep(cleanerCheckInterval - elapsedTime); } catch (Throwable t) { LOG.error("Caught an exception in the main loop of compactor cleaner, " + StringUtils.stringifyException(t)); } - } while (!stop.boolVal); + } while (!stop.get()); } private void clean(CompactionInfo ci) throws MetaException { diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java index 715f9c0..3d108d5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java @@ -32,7 +32,6 @@ import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler; -import org.apache.hadoop.hive.metastore.txn.TxnHandler; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; @@ -40,6 +39,7 @@ import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; /** * Superclass for all threads in the compactor. @@ -52,7 +52,7 @@ protected CompactionTxnHandler txnHandler; protected RawStore rs; protected int threadId; - protected BooleanPointer stop; + protected AtomicBoolean stop; @Override public void setHiveConf(HiveConf conf) { @@ -66,7 +66,7 @@ public void setThreadId(int threadId) { } @Override - public void init(BooleanPointer stop) throws MetaException { + public void init(AtomicBoolean stop) throws MetaException { this.stop = stop; setPriority(MIN_PRIORITY); setDaemon(true); // this means the process will exit without waiting for this thread diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index 3211759..f6a754d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -43,6 +43,7 @@ import java.security.PrivilegedExceptionAction; import java.util.List; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; /** * A class to initiate compactions. This will run in a separate thread. @@ -126,10 +127,10 @@ public void run() { } long elapsedTime = System.currentTimeMillis() - startedAt; - if (elapsedTime >= checkInterval || stop.boolVal) continue; + if (elapsedTime >= checkInterval || stop.get()) continue; else Thread.sleep(checkInterval - elapsedTime); - } while (!stop.boolVal); + } while (!stop.get()); } catch (Throwable t) { LOG.error("Caught an exception in the main loop of compactor initiator, exiting " + StringUtils.stringifyException(t)); @@ -137,7 +138,7 @@ public void run() { } @Override - public void init(BooleanPointer stop) throws MetaException { + public void init(AtomicBoolean stop) throws MetaException { super.init(stop); checkInterval = HiveConf.getLongVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_CHECK_INTERVAL) * 1000; diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java index f464df8..ac2fa7c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java @@ -27,13 +27,13 @@ import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; import org.apache.hadoop.hive.metastore.txn.TxnHandler; -import org.apache.hadoop.io.Text; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; import java.net.InetAddress; import java.net.UnknownHostException; import java.security.PrivilegedExceptionAction; +import java.util.concurrent.atomic.AtomicBoolean; /** * A class to do compactions. This will run in a separate thread. It will spin on the @@ -69,7 +69,7 @@ public void run() { do { CompactionInfo ci = txnHandler.findNextToCompact(name); - if (ci == null && !stop.boolVal) { + if (ci == null && !stop.get()) { try { Thread.sleep(SLEEP_TIME); continue; @@ -150,7 +150,7 @@ public Object run() throws Exception { ". Marking clean to avoid repeated failures, " + StringUtils.stringifyException(e)); txnHandler.markCleaned(ci); } - } while (!stop.boolVal); + } while (!stop.get()); } catch (Throwable t) { LOG.error("Caught an exception in the main loop of compactor worker " + name + ", exiting " + StringUtils.stringifyException(t)); @@ -158,7 +158,7 @@ public Object run() throws Exception { } @Override - public void init(BooleanPointer stop) throws MetaException { + public void init(AtomicBoolean stop) throws MetaException { super.init(stop); StringBuffer name = new StringBuffer(hostname()); diff --git ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java index 7f5134e..8755e4a 100644 --- ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java +++ ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java @@ -25,7 +25,6 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; -import org.apache.hadoop.hive.metastore.MetaStoreThread; import org.apache.hadoop.hive.metastore.api.*; import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; @@ -52,6 +51,7 @@ import java.util.Map; import java.util.Properties; import java.util.Stack; +import java.util.concurrent.atomic.AtomicBoolean; /** * Super class for all of the compactor test modules. @@ -64,7 +64,7 @@ protected IMetaStoreClient ms; protected long sleepTime = 1000; - private final MetaStoreThread.BooleanPointer stop = new MetaStoreThread.BooleanPointer(); + private final AtomicBoolean stop = new AtomicBoolean(); private final File tmpdir; protected CompactorTest() throws Exception { @@ -227,7 +227,7 @@ private void startThread(char type, HiveConf conf) throws Exception { } t.setThreadId((int) t.getId()); t.setHiveConf(conf); - stop.boolVal = true; + stop.set(true); t.init(stop); t.run(); } diff --git ql/src/test/queries/clientpositive/alter_table_retention.q ql/src/test/queries/clientpositive/alter_table_retention.q new file mode 100644 index 0000000..31b20fe --- /dev/null +++ ql/src/test/queries/clientpositive/alter_table_retention.q @@ -0,0 +1,41 @@ +-- table +create table test_table (id int, query string, name string); +describe formatted test_table; + +alter table test_table set retention 120 sec; +describe formatted test_table; + +alter table test_table set retention 30 min; +describe formatted test_table; + +alter table test_table set retention 12 hours; +describe formatted test_table; + +alter table test_table set retention 7 days; +describe formatted test_table; + +alter table test_table unset retention; +describe formatted test_table; + +drop table test_table; + +-- partitioned table +create table test_table (id int, query string, name string) partitioned by (ds string, hr string); +describe formatted test_table; + +alter table test_table set retention 120 sec; +describe formatted test_table; + +alter table test_table set retention 30 min; +describe formatted test_table; + +alter table test_table set retention 12 hours; +describe formatted test_table; + +alter table test_table set retention 7 days; +describe formatted test_table; + +alter table test_table unset retention; +describe formatted test_table; + +drop table test_table; diff --git ql/src/test/results/clientpositive/alter_table_retention.q.out ql/src/test/results/clientpositive/alter_table_retention.q.out new file mode 100644 index 0000000..c7d3738 --- /dev/null +++ ql/src/test/results/clientpositive/alter_table_retention.q.out @@ -0,0 +1,564 @@ +PREHOOK: query: -- table +create table test_table (id int, query string, name string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +POSTHOOK: query: -- table +create table test_table (id int, query string, name string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@test_table +PREHOOK: query: describe formatted test_table +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@test_table +POSTHOOK: query: describe formatted test_table +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@test_table +# col_name data_type comment + +id int +query string +name string + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Protect Mode: None +Retention: 0 +#### A masked pattern was here #### +Table Type: MANAGED_TABLE +Table Parameters: +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: alter table test_table set retention 120 sec +PREHOOK: type: null +PREHOOK: Input: default@test_table +PREHOOK: Output: default@test_table +POSTHOOK: query: alter table test_table set retention 120 sec +POSTHOOK: type: null +POSTHOOK: Input: default@test_table +POSTHOOK: Output: default@test_table +PREHOOK: query: describe formatted test_table +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@test_table +POSTHOOK: query: describe formatted test_table +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@test_table +# col_name data_type comment + +id int +query string +name string + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Protect Mode: None +Retention: 120 +#### A masked pattern was here #### +Table Type: MANAGED_TABLE +Table Parameters: + COLUMN_STATS_ACCURATE false +#### A masked pattern was here #### + numFiles 0 + numRows -1 + rawDataSize -1 + totalSize 0 +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: alter table test_table set retention 30 min +PREHOOK: type: null +PREHOOK: Input: default@test_table +PREHOOK: Output: default@test_table +POSTHOOK: query: alter table test_table set retention 30 min +POSTHOOK: type: null +POSTHOOK: Input: default@test_table +POSTHOOK: Output: default@test_table +PREHOOK: query: describe formatted test_table +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@test_table +POSTHOOK: query: describe formatted test_table +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@test_table +# col_name data_type comment + +id int +query string +name string + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Protect Mode: None +Retention: 1800 +#### A masked pattern was here #### +Table Type: MANAGED_TABLE +Table Parameters: + COLUMN_STATS_ACCURATE false +#### A masked pattern was here #### + numFiles 0 + numRows -1 + rawDataSize -1 + totalSize 0 +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: alter table test_table set retention 12 hours +PREHOOK: type: null +PREHOOK: Input: default@test_table +PREHOOK: Output: default@test_table +POSTHOOK: query: alter table test_table set retention 12 hours +POSTHOOK: type: null +POSTHOOK: Input: default@test_table +POSTHOOK: Output: default@test_table +PREHOOK: query: describe formatted test_table +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@test_table +POSTHOOK: query: describe formatted test_table +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@test_table +# col_name data_type comment + +id int +query string +name string + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Protect Mode: None +Retention: 43200 +#### A masked pattern was here #### +Table Type: MANAGED_TABLE +Table Parameters: + COLUMN_STATS_ACCURATE false +#### A masked pattern was here #### + numFiles 0 + numRows -1 + rawDataSize -1 + totalSize 0 +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: alter table test_table set retention 7 days +PREHOOK: type: null +PREHOOK: Input: default@test_table +PREHOOK: Output: default@test_table +POSTHOOK: query: alter table test_table set retention 7 days +POSTHOOK: type: null +POSTHOOK: Input: default@test_table +POSTHOOK: Output: default@test_table +PREHOOK: query: describe formatted test_table +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@test_table +POSTHOOK: query: describe formatted test_table +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@test_table +# col_name data_type comment + +id int +query string +name string + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Protect Mode: None +Retention: 604800 +#### A masked pattern was here #### +Table Type: MANAGED_TABLE +Table Parameters: + COLUMN_STATS_ACCURATE false +#### A masked pattern was here #### + numFiles 0 + numRows -1 + rawDataSize -1 + totalSize 0 +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: alter table test_table unset retention +PREHOOK: type: null +PREHOOK: Input: default@test_table +PREHOOK: Output: default@test_table +POSTHOOK: query: alter table test_table unset retention +POSTHOOK: type: null +POSTHOOK: Input: default@test_table +POSTHOOK: Output: default@test_table +PREHOOK: query: describe formatted test_table +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@test_table +POSTHOOK: query: describe formatted test_table +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@test_table +# col_name data_type comment + +id int +query string +name string + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Protect Mode: None +Retention: 0 +#### A masked pattern was here #### +Table Type: MANAGED_TABLE +Table Parameters: + COLUMN_STATS_ACCURATE false +#### A masked pattern was here #### + numFiles 0 + numRows -1 + rawDataSize -1 + totalSize 0 +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: drop table test_table +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@test_table +PREHOOK: Output: default@test_table +POSTHOOK: query: drop table test_table +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@test_table +POSTHOOK: Output: default@test_table +PREHOOK: query: -- partitioned table +create table test_table (id int, query string, name string) partitioned by (ds string, hr string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +POSTHOOK: query: -- partitioned table +create table test_table (id int, query string, name string) partitioned by (ds string, hr string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@test_table +PREHOOK: query: describe formatted test_table +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@test_table +POSTHOOK: query: describe formatted test_table +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@test_table +# col_name data_type comment + +id int +query string +name string + +# Partition Information +# col_name data_type comment + +ds string +hr string + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Protect Mode: None +Retention: 0 +#### A masked pattern was here #### +Table Type: MANAGED_TABLE +Table Parameters: +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: alter table test_table set retention 120 sec +PREHOOK: type: null +PREHOOK: Input: default@test_table +PREHOOK: Output: default@test_table +POSTHOOK: query: alter table test_table set retention 120 sec +POSTHOOK: type: null +POSTHOOK: Input: default@test_table +POSTHOOK: Output: default@test_table +PREHOOK: query: describe formatted test_table +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@test_table +POSTHOOK: query: describe formatted test_table +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@test_table +# col_name data_type comment + +id int +query string +name string + +# Partition Information +# col_name data_type comment + +ds string +hr string + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Protect Mode: None +Retention: 120 +#### A masked pattern was here #### +Table Type: MANAGED_TABLE +Table Parameters: +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: alter table test_table set retention 30 min +PREHOOK: type: null +PREHOOK: Input: default@test_table +PREHOOK: Output: default@test_table +POSTHOOK: query: alter table test_table set retention 30 min +POSTHOOK: type: null +POSTHOOK: Input: default@test_table +POSTHOOK: Output: default@test_table +PREHOOK: query: describe formatted test_table +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@test_table +POSTHOOK: query: describe formatted test_table +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@test_table +# col_name data_type comment + +id int +query string +name string + +# Partition Information +# col_name data_type comment + +ds string +hr string + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Protect Mode: None +Retention: 1800 +#### A masked pattern was here #### +Table Type: MANAGED_TABLE +Table Parameters: +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: alter table test_table set retention 12 hours +PREHOOK: type: null +PREHOOK: Input: default@test_table +PREHOOK: Output: default@test_table +POSTHOOK: query: alter table test_table set retention 12 hours +POSTHOOK: type: null +POSTHOOK: Input: default@test_table +POSTHOOK: Output: default@test_table +PREHOOK: query: describe formatted test_table +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@test_table +POSTHOOK: query: describe formatted test_table +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@test_table +# col_name data_type comment + +id int +query string +name string + +# Partition Information +# col_name data_type comment + +ds string +hr string + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Protect Mode: None +Retention: 43200 +#### A masked pattern was here #### +Table Type: MANAGED_TABLE +Table Parameters: +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: alter table test_table set retention 7 days +PREHOOK: type: null +PREHOOK: Input: default@test_table +PREHOOK: Output: default@test_table +POSTHOOK: query: alter table test_table set retention 7 days +POSTHOOK: type: null +POSTHOOK: Input: default@test_table +POSTHOOK: Output: default@test_table +PREHOOK: query: describe formatted test_table +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@test_table +POSTHOOK: query: describe formatted test_table +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@test_table +# col_name data_type comment + +id int +query string +name string + +# Partition Information +# col_name data_type comment + +ds string +hr string + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Protect Mode: None +Retention: 604800 +#### A masked pattern was here #### +Table Type: MANAGED_TABLE +Table Parameters: +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: alter table test_table unset retention +PREHOOK: type: null +PREHOOK: Input: default@test_table +PREHOOK: Output: default@test_table +POSTHOOK: query: alter table test_table unset retention +POSTHOOK: type: null +POSTHOOK: Input: default@test_table +POSTHOOK: Output: default@test_table +PREHOOK: query: describe formatted test_table +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@test_table +POSTHOOK: query: describe formatted test_table +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@test_table +# col_name data_type comment + +id int +query string +name string + +# Partition Information +# col_name data_type comment + +ds string +hr string + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Protect Mode: None +Retention: 0 +#### A masked pattern was here #### +Table Type: MANAGED_TABLE +Table Parameters: +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: drop table test_table +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@test_table +PREHOOK: Output: default@test_table +POSTHOOK: query: drop table test_table +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@test_table +POSTHOOK: Output: default@test_table