diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 0c7c0d7..3509163 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -385,7 +385,6 @@ MOVE_EXPORTED_METADATA_TO_TRASH("hive.metadata.move.exported.metadata.to.trash", true), METASTORE_RETENTION_INTERVAL("hive.metastore.retention.interval.minutes", -1), - METASTORE_RETENTION_DEFAULT("hive.metastore.retention.default.minutes", -1), METASTORE_RETENTION_DATABASES("hive.metastore.retention.databases", ""), // CLI diff --git metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java index 7ce27da..695e7ce 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -231,58 +231,87 @@ public void setConf(Configuration conf) { } else { LOG.info("Initialized ObjectStore"); } - startTTLChecker(hiveConf); + startRetention(hiveConf); } finally { pmfPropLock.unlock(); } } - private static Thread ttlChecker; + private static Thread retention; - private void startTTLChecker(final Configuration conf) { - if (ttlChecker != null) { + private void startRetention(final Configuration conf) { + if (retention != null) { return; } final int interval = HiveConf.getIntVar(conf, METASTORE_RETENTION_INTERVAL); - final int retention = HiveConf.getIntVar(conf, METASTORE_RETENTION_DEFAULT); - String databaseNames = HiveConf.getVar(conf, METASTORE_RETENTION_DATABASES).trim(); - final String[] databases = databaseNames.isEmpty() ? null : databaseNames.split(","); if (interval < 0) { return; } - ttlChecker = new Thread("TTL-Checker") { + final String databaseNames = HiveConf.getVar(conf, METASTORE_RETENTION_DATABASES).trim(); + final String[] databases = databaseNames.isEmpty() ? null : databaseNames.split(","); + retention = new Thread("Retention [" + interval + "]") { public void run() { + int sleep = interval; while (true) { try { - Thread.sleep(TimeUnit.MINUTES.toMillis(interval)); + Thread.sleep(TimeUnit.MINUTES.toMillis(sleep)); } catch (InterruptedException e) { // ignore } - checkTTL(TimeUnit.MINUTES.toSeconds(retention), databases); + try { + if (checkTTL(databases) > 0) { + sleep = interval; + continue; + } + } catch (MetaException e) { + LOG.warn("Failed to access metastore", e); + } + sleep = Math.max(interval * 10, sleep <<= 1); } } }; - ttlChecker.setDaemon(true); - ttlChecker.start(); + retention.setDaemon(true); + retention.start(); } - private void checkTTL(long retention, String[] databases) { - try { - for (Table table : leastRecentlyAccessedTables(retention, databases, 3)) { - String tableName = table.getDbName() + "." + table.getTableName(); - long time = table.getRetention() > 0 ? table.getRetention() : retention; - LOG.warn("Dropping table " + tableName + - " by retention policy (Created: " + new Date(table.getCreateTime() * 1000l) + - " Retention: " + time + " seconds)"); - try { - dropTable(table.getDbName(), table.getTableName()); - } catch (Exception e) { - LOG.warn("Failed to drop table" + tableName, e); + private int checkTTL(String[] databases) throws MetaException { + Map targets = getRetentionTargets(databases); + for (Entry target : targets.entrySet()) { + String[] names = target.getKey(); + Integer[] base = target.getValue(); + + String name; + if (names.length == 2) { + name = "table " + names[0] + "." + names[1]; + } else { + name = "partition " + names[0] + "." + names[1] + "." + names[2]; + } + String time = base[1] + " seconds"; + if (base[1] > 60 ) { + if (base[1] > 60 * 60) { + if (base[1] > 60 * 60 * 24) { + time += "(about " + base[1] / 60 / 60 / 24 + "+ days)"; + } else { + time += "(about " + base[1] / 60 / 60 + "+ hours)"; + } + } else { + time += "(about " + base[1] / 60 + "+ minutes)"; } } - } catch (MetaException e) { - LOG.warn("Failed to get table", e); + LOG.warn("Dropping " + name + " by retention policy (Created: " + + new Date(base[0] * 1000l) + ", Retention on: " + time); + try { + if (names.length == 2) { + dropTable(names[0], names[1]); + } else { + List values = Warehouse.getPartValuesFromPartName(names[2]); + dropPartition(names[0], names[1], values); + } + } catch (Exception e) { + LOG.warn("Failed to drop " + name + " (retention)", e); + } } + return targets.size(); } private ClassLoader classLoader; @@ -968,40 +997,26 @@ private int nowInSeconds() { return (int) (System.currentTimeMillis() / 1000); } - private List leastRecentlyAccessedTables(long retention, String[] databases, int limit) - throws MetaException { - int current = nowInSeconds(); // todo: use db time - List
tables = new ArrayList
(); + private Map getRetentionTargets(String[] databases) throws MetaException { boolean committed = false; + Map targets = new HashMap(); try { openTransaction(); - Query query = pm.newQuery(MTable.class); - StringBuilder builder = new StringBuilder(); - if (retention > 0) { - builder.append("((retention <= 0 && createTime " + retention + " <" + current + ") ||"); - } - builder.append(" (retention > 0 && createTime + retention < " + current + ")"); - if (retention > 0) { - builder.append(')'); - } - if (databases != null) { - builder.append(" && database.name in ("); - for (int i = 0; i < databases.length; i++) { - if (i > 0) { - builder.append(", "); - } - builder.append("\"" + databases[i] + "\""); - } - builder.append(")"); - } - query.setFilter(builder.toString()); - query.setOrdering("lastAccessTime ascending, createTime ascending"); - if (limit > 0) { - query.setRange(0, limit); - } - Collection mtables = (Collection) query.execute(); - for (Iterator iter = mtables.iterator(); iter.hasNext();) { - tables.add(convertToTable((MTable) iter.next())); + int current = nowInSeconds(); // todo: use db time + for (Object t : getTableRetentions(databases, current)) { + MTable mTable = (MTable)t; + MDatabase database = mTable.getDatabase(); + String[] name = new String[] {database.getName(), mTable.getTableName()}; + targets.put(name, new Integer[]{mTable.getCreateTime(), mTable.getRetention()}); + } + for (Object p : getPartitionRetentions(databases, current)) { + MPartition mPart = (MPartition)p; + MTable mTable = mPart.getTable(); + MDatabase database = mTable.getDatabase(); + List partCols = convertToFieldSchemas(mTable.getPartitionKeys()); + String partName = Warehouse.makePartName(partCols, mPart.getValues()); + String[] name = new String[] {database.getName(), mTable.getTableName(), partName}; + targets.put(name, new Integer[]{mPart.getCreateTime(), mTable.getRetention()}); } committed = commitTransaction(); } finally { @@ -1009,7 +1024,45 @@ private int nowInSeconds() { rollbackTransaction(); } } - return tables; + return targets; + } + + private Collection getTableRetentions(String[] databases, int current) { + Query query = pm.newQuery(MTable.class); + StringBuilder builder = new StringBuilder(); + builder.append("partitionKeys == null && "); + builder.append("retention > 0 && createTime + retention < " + current); + if (databases != null && databases.length > 0) { + builder.append(" && database.name in ("); + for (int i = 0; i < databases.length; i++) { + if (i > 0) { + builder.append(", "); + } + builder.append("\"" + databases[i] + "\""); + } + builder.append(")"); + } + query.setFilter(builder.toString()); + return (Collection) query.execute(); + } + + private Collection getPartitionRetentions(String[] databases, int current) { + Query query = pm.newQuery(MPartition.class); + StringBuilder builder = new StringBuilder(); + builder.append("table.partitionKeys != null && "); + builder.append("table.retention > 0 && createTime + table.retention < " + current); + if (databases != null && databases.length > 0) { + builder.append(" && table.database.name in ("); + for (int i = 0; i < databases.length; i++) { + if (i > 0) { + builder.append(", "); + } + builder.append("\"" + databases[i] + "\""); + } + builder.append(")"); + } + query.setFilter(builder.toString()); + return (Collection) query.execute(); } @Override diff --git ql/src/test/queries/clientpositive/alter_table_retention.q ql/src/test/queries/clientpositive/alter_table_retention.q new file mode 100644 index 0000000..d521755 --- /dev/null +++ ql/src/test/queries/clientpositive/alter_table_retention.q @@ -0,0 +1,20 @@ +-- test table +create table test_table (id int, query string, name string); +describe formatted test_table; + +alter table test_table set retention 120 sec; +describe formatted test_table; + +alter table test_table set retention 30 min; +describe formatted test_table; + +alter table test_table set retention 12 hours; +describe formatted test_table; + +alter table test_table set retention 7 days; +describe formatted test_table; + +alter table test_table unset retention; +describe formatted test_table; + +drop table test_table; diff --git ql/src/test/results/clientpositive/alter_table_retention.q.out ql/src/test/results/clientpositive/alter_table_retention.q.out new file mode 100644 index 0000000..464fc5c --- /dev/null +++ ql/src/test/results/clientpositive/alter_table_retention.q.out @@ -0,0 +1,279 @@ +PREHOOK: query: -- test table +create table test_table (id int, query string, name string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +POSTHOOK: query: -- test table +create table test_table (id int, query string, name string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@test_table +PREHOOK: query: describe formatted test_table +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@test_table +POSTHOOK: query: describe formatted test_table +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@test_table +# col_name data_type comment + +id int +query string +name string + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Protect Mode: None +Retention: 0 +#### A masked pattern was here #### +Table Type: MANAGED_TABLE +Table Parameters: +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: alter table test_table set retention 120 sec +PREHOOK: type: null +PREHOOK: Input: default@test_table +PREHOOK: Output: default@test_table +POSTHOOK: query: alter table test_table set retention 120 sec +POSTHOOK: type: null +POSTHOOK: Input: default@test_table +POSTHOOK: Output: default@test_table +PREHOOK: query: describe formatted test_table +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@test_table +POSTHOOK: query: describe formatted test_table +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@test_table +# col_name data_type comment + +id int +query string +name string + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Protect Mode: None +Retention: 120 +#### A masked pattern was here #### +Table Type: MANAGED_TABLE +Table Parameters: + COLUMN_STATS_ACCURATE false +#### A masked pattern was here #### + numFiles 0 + numRows -1 + rawDataSize -1 + totalSize 0 +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: alter table test_table set retention 30 min +PREHOOK: type: null +PREHOOK: Input: default@test_table +PREHOOK: Output: default@test_table +POSTHOOK: query: alter table test_table set retention 30 min +POSTHOOK: type: null +POSTHOOK: Input: default@test_table +POSTHOOK: Output: default@test_table +PREHOOK: query: describe formatted test_table +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@test_table +POSTHOOK: query: describe formatted test_table +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@test_table +# col_name data_type comment + +id int +query string +name string + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Protect Mode: None +Retention: 1800 +#### A masked pattern was here #### +Table Type: MANAGED_TABLE +Table Parameters: + COLUMN_STATS_ACCURATE false +#### A masked pattern was here #### + numFiles 0 + numRows -1 + rawDataSize -1 + totalSize 0 +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: alter table test_table set retention 12 hours +PREHOOK: type: null +PREHOOK: Input: default@test_table +PREHOOK: Output: default@test_table +POSTHOOK: query: alter table test_table set retention 12 hours +POSTHOOK: type: null +POSTHOOK: Input: default@test_table +POSTHOOK: Output: default@test_table +PREHOOK: query: describe formatted test_table +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@test_table +POSTHOOK: query: describe formatted test_table +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@test_table +# col_name data_type comment + +id int +query string +name string + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Protect Mode: None +Retention: 43200 +#### A masked pattern was here #### +Table Type: MANAGED_TABLE +Table Parameters: + COLUMN_STATS_ACCURATE false +#### A masked pattern was here #### + numFiles 0 + numRows -1 + rawDataSize -1 + totalSize 0 +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: alter table test_table set retention 7 days +PREHOOK: type: null +PREHOOK: Input: default@test_table +PREHOOK: Output: default@test_table +POSTHOOK: query: alter table test_table set retention 7 days +POSTHOOK: type: null +POSTHOOK: Input: default@test_table +POSTHOOK: Output: default@test_table +PREHOOK: query: describe formatted test_table +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@test_table +POSTHOOK: query: describe formatted test_table +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@test_table +# col_name data_type comment + +id int +query string +name string + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Protect Mode: None +Retention: 604800 +#### A masked pattern was here #### +Table Type: MANAGED_TABLE +Table Parameters: + COLUMN_STATS_ACCURATE false +#### A masked pattern was here #### + numFiles 0 + numRows -1 + rawDataSize -1 + totalSize 0 +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: alter table test_table unset retention +PREHOOK: type: null +PREHOOK: Input: default@test_table +PREHOOK: Output: default@test_table +POSTHOOK: query: alter table test_table unset retention +POSTHOOK: type: null +POSTHOOK: Input: default@test_table +POSTHOOK: Output: default@test_table +PREHOOK: query: describe formatted test_table +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@test_table +POSTHOOK: query: describe formatted test_table +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@test_table +# col_name data_type comment + +id int +query string +name string + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Protect Mode: None +Retention: 0 +#### A masked pattern was here #### +Table Type: MANAGED_TABLE +Table Parameters: + COLUMN_STATS_ACCURATE false +#### A masked pattern was here #### + numFiles 0 + numRows -1 + rawDataSize -1 + totalSize 0 +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: drop table test_table +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@test_table +PREHOOK: Output: default@test_table +POSTHOOK: query: drop table test_table +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@test_table +POSTHOOK: Output: default@test_table