diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java index 7afef0fce6..4efcb79659 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java @@ -202,6 +202,96 @@ public void testMajorCompaction() throws Exception { executeStatementOnDriver("drop table " + tblName, driver); } + @Test + public void testOptimizeCompaction() throws Exception { + String dbName = "default"; + String tblName = "testOptimizeCompaction"; + executeStatementOnDriver("drop table if exists " + tblName, driver); + executeStatementOnDriver("create transactional table " + tblName + " (a int, b int) clustered by (a) into 2 buckets" + + " stored as ORC TBLPROPERTIES('bucketing_version'='2', 'transactional'='true'," + + " 'transactional_properties'='default')", driver); + executeStatementOnDriver("insert into " + tblName + " values(1,2),(1,3),(1,4),(2,2),(2,3),(2,4)", driver); + executeStatementOnDriver("insert into " + tblName + " values(3,2),(3,3),(3,4),(4,2),(4,3),(4,4)", driver); + executeStatementOnDriver("delete from " + tblName + " where b = 2", driver); + // Find the location of the table + IMetaStoreClient msClient = new HiveMetaStoreClient(conf); + Table table = msClient.getTable(dbName, tblName); + FileSystem fs = FileSystem.get(conf); + // Verify deltas (delta_0000001_0000001_0000, delta_0000002_0000002_0000) are present + FileStatus[] filestatus = fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.deltaFileFilter); + String[] deltas = new String[filestatus.length]; + for (int i = 0; i < deltas.length; i++) { + deltas[i] = filestatus[i].getPath().getName(); + } + Arrays.sort(deltas); + String[] expectedDeltas = new String[] { "delta_0000001_0000001_0000", "delta_0000002_0000002_0000" }; + if (!Arrays.deepEquals(expectedDeltas, deltas)) { + Assert.fail("Expected: " + Arrays.toString(expectedDeltas) + ", found: " + Arrays.toString(deltas)); + } + // Verify that delete delta (delete_delta_0000003_0000003_0000) is present + FileStatus[] deleteDeltaStat = fs.listStatus(new Path(table.getSd().getLocation()), + AcidUtils.deleteEventDeltaDirFilter); + String[] deleteDeltas = new String[deleteDeltaStat.length]; + for (int i = 0; i < deleteDeltas.length; i++) { + deleteDeltas[i] = deleteDeltaStat[i].getPath().getName(); + } + Arrays.sort(deleteDeltas); + String[] expectedDeleteDeltas = new String[] { "delete_delta_0000003_0000003_0000" }; + if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) { + Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas)); + } + List expectedRsBucket0 = new ArrayList<>(); + expectedRsBucket0.add("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t2\t3"); + expectedRsBucket0.add("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t2\t4"); + expectedRsBucket0.add("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t3\t3"); + expectedRsBucket0.add("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":2}\t3\t4"); + List expectedRsBucket1 = new ArrayList<>(); + expectedRsBucket1.add("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t1\t3"); + expectedRsBucket1.add("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":2}\t1\t4"); + expectedRsBucket1.add("{\"writeid\":2,\"bucketid\":536936448,\"rowid\":1}\t4\t3"); + expectedRsBucket1.add("{\"writeid\":2,\"bucketid\":536936448,\"rowid\":2}\t4\t4"); + // Bucket 0 + List rsBucket0 = executeStatementOnDriverAndReturnResults("select ROW__ID, * from " + tblName + + " where ROW__ID.bucketid = 536870912 order by ROW__ID", driver); + Assert.assertEquals("normal read", expectedRsBucket0, rsBucket0); + // Bucket 1 + List rsBucket1 = executeStatementOnDriverAndReturnResults("select ROW__ID, * from " + tblName + + " where ROW__ID.bucketid = 536936448 order by ROW__ID", driver); + Assert.assertEquals("normal read", expectedRsBucket1, rsBucket1); + // Run major compaction and cleaner + CompactorTestUtil.runCompaction(conf, dbName, tblName, CompactionType.OPTIMIZE, true); + CompactorTestUtil.runCleaner(conf); + // Should contain only one base directory now + filestatus = fs.listStatus(new Path(table.getSd().getLocation())); + String[] bases = new String[filestatus.length]; + for (int i = 0; i < bases.length; i++) { + bases[i] = filestatus[i].getPath().getName(); + } + + Arrays.sort(bases); + String[] expectedBases = new String[] { "base_0000003" }; + + List expectedRsCompact = new ArrayList<>(); + expectedRsCompact.add("2\t3"); + expectedRsCompact.add("2\t4"); + expectedRsCompact.add("3\t3"); + expectedRsCompact.add("3\t4"); + expectedRsCompact.add("1\t3"); + expectedRsCompact.add("1\t4"); + expectedRsCompact.add("4\t3"); + expectedRsCompact.add("4\t4"); + + if (!Arrays.deepEquals(expectedBases, bases)) { + Assert.fail("Expected: " + Arrays.toString(expectedBases) + ", found: " + Arrays.toString(bases)); + } + + List rsCompact = executeStatementOnDriverAndReturnResults("select * from " + tblName, driver); + Assert.assertEquals("compacted read", expectedRsCompact, rsCompact); + + // Clean up + executeStatementOnDriver("drop table " + tblName, driver); + } + @Test public void testMinorCompactionNotPartitionedWithoutBuckets() throws Exception { String dbName = "default"; diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java index 274da31317..5f5ab4cf9b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java @@ -1086,7 +1086,7 @@ private void analyzeAlterTableCompact(ASTNode ast, TableName tableName, String type = unescapeSQLString(ast.getChild(0).getText()).toLowerCase(); - if (!type.equals("minor") && !type.equals("major")) { + if (!type.equals("minor") && !type.equals("major") && !type.equals("optimize")) { throw new SemanticException(ErrorMsg.INVALID_COMPACTION_TYPE.getMsg()); } diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/OptimizeQueryCompactor.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/OptimizeQueryCompactor.java new file mode 100644 index 0000000000..8866e8ee5d --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/OptimizeQueryCompactor.java @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import com.google.common.collect.Lists; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.StatsSetupConst; +import org.apache.hadoop.hive.common.ValidWriteIdList; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Order; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.SkewedInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.metastore.txn.CompactionInfo; +import org.apache.hadoop.hive.ql.ddl.table.create.show.ShowCreateTableOperation; +import org.apache.hadoop.hive.ql.io.AcidOutputFormat; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.util.DirectionUtils; +import org.apache.hadoop.util.StringUtils; +import org.apache.hive.common.util.HiveStringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Class responsible to run query based major compaction on insert only tables. + */ +final class OptimizeQueryCompactor extends QueryCompactor { + + private static final Logger LOG = LoggerFactory.getLogger(OptimizeQueryCompactor.class.getName()); + + @Override void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageDescriptor storageDescriptor, + ValidWriteIdList writeIds, CompactionInfo compactionInfo) throws IOException { + + String tmpLocation = Util.generateTmpPath(storageDescriptor); + Path baseLocation = new Path(tmpLocation, "_base"); + + // Set up the session for driver. + HiveConf driverConf = new HiveConf(hiveConf); + driverConf.set(HiveConf.ConfVars.HIVE_QUOTEDID_SUPPORT.varname, "column"); + + // Note: we could skip creating the table and just add table type stuff directly to the + // "insert overwrite directory" command if there were no bucketing or list bucketing. + String tmpPrefix = table.getDbName() + ".tmp_compactor_" + table.getTableName() + "_"; + String tmpTableName = tmpPrefix + System.currentTimeMillis(); + List createTableQueries = + getCreateQueries(tmpTableName, table, partition == null ? table.getSd() : partition.getSd(), + baseLocation.toString()); + List compactionQueries = getCompactionQueries(table, partition, tmpTableName); + List dropQueries = getDropQueries(tmpTableName); + runCompactionQueries(driverConf, tmpTableName, storageDescriptor, writeIds, compactionInfo, + createTableQueries, compactionQueries, dropQueries); + } + + /** + * Note: similar logic to the main committer; however, no ORC versions and stuff like that. + * @param dest The final directory; basically a SD directory. Not the actual base/delta. + * @param compactorTxnId txn that the compactor started + */ + @Override + protected void commitCompaction(String dest, String tmpTableName, HiveConf conf, + ValidWriteIdList actualWriteIds, long compactorTxnId) throws IOException, HiveException { + org.apache.hadoop.hive.ql.metadata.Table tempTable = Hive.get().getTable(tmpTableName); + String from = tempTable.getSd().getLocation(); + Path fromPath = new Path(from), toPath = new Path(dest); + FileSystem fs = fromPath.getFileSystem(conf); + // Assume the high watermark can be used as maximum transaction ID. + //todo: is that true? can it be aborted? does it matter for compaction? probably OK since + //getAcidState() doesn't check if X is valid in base_X_vY for compacted base dirs. + long maxTxn = actualWriteIds.getHighWatermark(); + AcidOutputFormat.Options options = + new AcidOutputFormat.Options(conf).writingBase(true).isCompressed(false).maximumWriteId(maxTxn).bucket(0) + .statementId(-1); + Path newBaseDir = AcidUtils.createFilename(toPath, options).getParent(); + if (!fs.exists(fromPath)) { + LOG.info(from + " not found. Assuming 0 splits. Creating " + newBaseDir); + fs.mkdirs(newBaseDir); + return; + } + LOG.info("Moving contents of " + from + " to " + dest); + fs.rename(fromPath, newBaseDir); + fs.delete(fromPath.getParent(), true); + } + + private List getCreateQueries(String fullName, Table t, StorageDescriptor sd, String location) { + StringBuilder query = new StringBuilder("create temporary table ").append(fullName).append("("); + List cols = t.getSd().getCols(); + boolean isFirst = true; + for (FieldSchema col : cols) { + if (!isFirst) { + query.append(", "); + } + isFirst = false; + query.append("`").append(col.getName()).append("` ").append(col.getType()); + } + query.append(") "); + + // Bucketing. + List buckCols = t.getSd().getBucketCols(); + if (buckCols.size() > 0) { + query.append("CLUSTERED BY (").append(StringUtils.join(",", buckCols)).append(") "); + List sortCols = t.getSd().getSortCols(); + if (sortCols.size() > 0) { + query.append("SORTED BY ("); + isFirst = true; + for (Order sortCol : sortCols) { + if (!isFirst) { + query.append(", "); + } + isFirst = false; + query.append(sortCol.getCol()).append(" ").append(DirectionUtils.codeToText(sortCol.getOrder())); + } + query.append(") "); + } + query.append("INTO ").append(t.getSd().getNumBuckets()).append(" BUCKETS"); + } + + // Stored as directories. We don't care about the skew otherwise. + if (t.getSd().isStoredAsSubDirectories()) { + SkewedInfo skewedInfo = t.getSd().getSkewedInfo(); + if (skewedInfo != null && !skewedInfo.getSkewedColNames().isEmpty()) { + query.append(" SKEWED BY (").append(StringUtils.join(", ", skewedInfo.getSkewedColNames())).append(") ON "); + isFirst = true; + for (List colValues : skewedInfo.getSkewedColValues()) { + if (!isFirst) { + query.append(", "); + } + isFirst = false; + query.append("('").append(StringUtils.join("','", colValues)).append("')"); + } + query.append(") STORED AS DIRECTORIES"); + } + } + + SerDeInfo serdeInfo = sd.getSerdeInfo(); + Map serdeParams = serdeInfo.getParameters(); + query.append(" ROW FORMAT SERDE '").append(HiveStringUtils.escapeHiveCommand(serdeInfo.getSerializationLib())) + .append("'"); + String sh = t.getParameters().get(hive_metastoreConstants.META_TABLE_STORAGE); + assert sh == null; // Not supposed to be a compactable table. + if (!serdeParams.isEmpty()) { + ShowCreateTableOperation.appendSerdeParams(query, serdeParams); + } + query.append("STORED AS INPUTFORMAT '").append(HiveStringUtils.escapeHiveCommand(sd.getInputFormat())) + .append("' OUTPUTFORMAT '").append(HiveStringUtils.escapeHiveCommand(sd.getOutputFormat())) + .append("' LOCATION '").append(HiveStringUtils.escapeHiveCommand(location)).append("' TBLPROPERTIES ("); + // Exclude all standard table properties. + Set excludes = getHiveMetastoreConstants(); + excludes.addAll(StatsSetupConst.TABLE_PARAMS_STATS_KEYS); + isFirst = true; + for (Map.Entry e : t.getParameters().entrySet()) { + if (e.getValue() == null) { + continue; + } + if (excludes.contains(e.getKey())) { + continue; + } + if (!isFirst) { + query.append(", "); + } + isFirst = false; + query.append("'").append(e.getKey()).append("'='").append(HiveStringUtils.escapeHiveCommand(e.getValue())) + .append("'"); + } + if (!isFirst) { + query.append(", "); + } + query.append("'transactional'='false')"); + return Lists.newArrayList(query.toString()); + + } + + private List getCompactionQueries(Table t, Partition p, String tmpName) { + String fullName = t.getDbName() + "." + t.getTableName(); + // ideally we should make a special form of insert overwrite so that we: + // 1) Could use fast merge path for ORC and RC. + // 2) Didn't have to create a table. + + StringBuilder query = new StringBuilder("insert overwrite table " + tmpName + " "); + StringBuilder filter = new StringBuilder(); + if (p != null) { + filter = new StringBuilder(" where "); + List vals = p.getValues(); + List keys = t.getPartitionKeys(); + assert keys.size() == vals.size(); + for (int i = 0; i < keys.size(); ++i) { + filter.append(i == 0 ? "`" : " and `").append(keys.get(i).getName()).append("`='").append(vals.get(i)) + .append("'"); + } + query.append(" select "); + // Use table descriptor for columns. + List cols = t.getSd().getCols(); + for (int i = 0; i < cols.size(); ++i) { + query.append(i == 0 ? "`" : ", `").append(cols.get(i).getName()).append("`"); + } + } else { + query.append("select *"); + } + query.append(" from ").append(fullName).append(filter); + + String optimizeColumns = t.getParameters().get(hive_metastoreConstants.TABLE_OPTMIZE_COMPACT_COLUMNS); + + if (optimizeColumns != null) { + query.append(" CLUSTER BY ").append(optimizeColumns); + } + return Lists.newArrayList(query.toString()); + } + + private List getDropQueries(String tmpTableName) { + return Lists.newArrayList("drop table if exists " + tmpTableName); + } + + private static Set getHiveMetastoreConstants() { + Set result = new HashSet<>(); + for (Field f : hive_metastoreConstants.class.getDeclaredFields()) { + if (!Modifier.isStatic(f.getModifiers())) { + continue; + } + if (!Modifier.isFinal(f.getModifiers())) { + continue; + } + if (!String.class.equals(f.getType())) { + continue; + } + f.setAccessible(true); + try { + result.add((String) f.get(null)); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } + } + return result; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java index 2f2bb21a13..a61e88e52a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java @@ -42,6 +42,9 @@ private QueryCompactorFactory() { *
* {@link MmMajorQueryCompactor} - handles query based minor compaction for micro-managed tables *
+ *
+ * {@link OptimizeQueryCompactor} - handles query based minor compaction for micro-managed tables + *
*

* @param table the table, on which the compaction should be running, must be not null. * @param configuration the hive configuration, must be not null. @@ -53,7 +56,7 @@ static QueryCompactor getQueryCompactor(Table table, HiveConf configuration, Com .getBoolVar(configuration, HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED)) { if (compactionInfo.isMajorCompaction()) { return new MajorQueryCompactor(); - } else if (!compactionInfo.isMajorCompaction() && "tez" + } else if (!compactionInfo.isMajorCompaction() && !compactionInfo.isOptimizeCompaction() && "tez" .equalsIgnoreCase(HiveConf.getVar(configuration, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE))) { // query based minor compaction is only supported on tez return new MinorQueryCompactor(); @@ -65,6 +68,10 @@ static QueryCompactor getQueryCompactor(Table table, HiveConf configuration, Com return new MmMajorQueryCompactor(); } + if (compactionInfo.isOptimizeCompaction()) { + return new OptimizeQueryCompactor(); + } + return null; } diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java index 5aff71e0e9..78c10593df 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java @@ -24,7 +24,13 @@ import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils; import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.LockComponentBuilder; +import org.apache.hadoop.hive.metastore.LockRequestBuilder; import org.apache.hadoop.hive.metastore.MetaStoreThread; +import org.apache.hadoop.hive.metastore.api.DataOperationType; +import org.apache.hadoop.hive.metastore.api.LockRequest; +import org.apache.hadoop.hive.metastore.api.LockResponse; +import org.apache.hadoop.hive.metastore.api.LockState; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; @@ -166,6 +172,23 @@ public void run() { * multi-stmt txn. {@link Driver#setCompactionWriteIds(ValidWriteIdList, long)} */ long compactorTxnId = msc.openTxn(ci.runAs, TxnType.COMPACTION); + // If we have a compaction of type CompactionType.OPTIMIZE we take a semi-shared lock + if (ci.isOptimizeCompaction()) { + LockRequest lockRequest = createLockRequest(t, ci.getFullPartitionName(), ci.runAs, compactorTxnId, getName()); + try { + LockResponse res = msc.lock(lockRequest); + if (res.getState() != LockState.ACQUIRED) { + LOG.error("Unable to acquire lock on " + ci.getFullPartitionName() + "for performing a optimized compaction"); + msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci)); + continue; + } + } catch (TException e) { + LOG.error("Unable to acquire lock on " + ci.getFullPartitionName() + "for performing a optimized compaction", e); + msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci)); + continue; + } + } + heartbeater = new CompactionHeartbeater(compactorTxnId, fullTableName, conf); heartbeater.start(); @@ -270,6 +293,26 @@ public Object run() throws Exception { } while (!stop.get()); } + static LockRequest createLockRequest(Table tbl, String partNameForLock, String user, long txnId, String agentInfo) { + LockRequestBuilder requestBuilder = new LockRequestBuilder(agentInfo); + requestBuilder.setUser(user); + requestBuilder.setTransactionId(txnId); + + LockComponentBuilder lockCompBuilder = new LockComponentBuilder() + .setDbName(tbl.getDbName()) + .setTableName(tbl.getTableName()) + .setSemiShared() + .setIsDynamicPartitionWrite(false) + .setOperationType(DataOperationType.NO_TXN) + .setIsDynamicPartitionWrite(true); + + if (partNameForLock != null && !partNameForLock.isEmpty()) { + lockCompBuilder.setPartitionName(partNameForLock); + } + requestBuilder.addLockComponent(lockCompBuilder.build()); + return requestBuilder.build(); + } + @Override public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception { super.init(stop, looped); diff --git standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionType.java standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionType.java index 7450b27cf3..3a5265753d 100644 --- standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionType.java +++ standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionType.java @@ -13,7 +13,8 @@ public enum CompactionType implements org.apache.thrift.TEnum { MINOR(1), - MAJOR(2); + MAJOR(2), + OPTIMIZE(3); private final int value; @@ -38,6 +39,8 @@ public static CompactionType findByValue(int value) { return MINOR; case 2: return MAJOR; + case 3: + return OPTIMIZE; default: return null; } diff --git standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java index e450d362d8..ed74d77cee 100644 --- standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java +++ standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java @@ -98,4 +98,6 @@ public static final String JDBC_CONFIG_PREFIX = "hive.sql."; + public static final String TABLE_OPTMIZE_COMPACT_COLUMNS = "optimize_bucket_columns"; + } diff --git standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php index 093ad4be27..644a33a6e1 100644 --- standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php +++ standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php @@ -99,9 +99,11 @@ final class LockType { final class CompactionType { const MINOR = 1; const MAJOR = 2; + const OPTIMIZE = 3; static public $__names = array( 1 => 'MINOR', 2 => 'MAJOR', + 3 => 'OPTIMIZE', ); } @@ -38735,6 +38737,7 @@ final class Constant extends \Thrift\Type\TConstant { static protected $TABLE_BUCKETING_VERSION; static protected $DRUID_CONFIG_PREFIX; static protected $JDBC_CONFIG_PREFIX; + static protected $TABLE_OPTMIZE_COMPACT_COLUMNS; static protected function init_DDL_TIME() { return "transient_lastDdlTime"; @@ -38859,6 +38862,10 @@ final class Constant extends \Thrift\Type\TConstant { static protected function init_JDBC_CONFIG_PREFIX() { return "hive.sql."; } + + static protected function init_TABLE_OPTMIZE_COMPACT_COLUMNS() { + return "optimize_bucket_columns"; + } } diff --git standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/constants.py standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/constants.py index 61d68929ab..b4bb029214 100644 --- standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/constants.py +++ standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/constants.py @@ -40,3 +40,4 @@ TABLE_BUCKETING_VERSION = "bucketing_version" DRUID_CONFIG_PREFIX = "druid." JDBC_CONFIG_PREFIX = "hive.sql." +TABLE_OPTMIZE_COMPACT_COLUMNS = "optimize_bucket_columns" diff --git standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py index 0dcca59b68..163f99e5da 100644 --- standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py +++ standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py @@ -143,15 +143,18 @@ class LockType: class CompactionType: MINOR = 1 MAJOR = 2 + OPTIMIZE = 3 _VALUES_TO_NAMES = { 1: "MINOR", 2: "MAJOR", + 3: "OPTIMIZE", } _NAMES_TO_VALUES = { "MINOR": 1, "MAJOR": 2, + "OPTIMIZE": 3, } class GrantRevokeType: diff --git standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_constants.rb standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_constants.rb index 2b117280f8..8d29843940 100644 --- standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_constants.rb +++ standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_constants.rb @@ -69,3 +69,5 @@ DRUID_CONFIG_PREFIX = %q"druid." JDBC_CONFIG_PREFIX = %q"hive.sql." +TABLE_OPTMIZE_COMPACT_COLUMNS = %q"optimize_bucket_columns" + diff --git standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb index 8d7c32a765..eb897f3bf8 100644 --- standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb +++ standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb @@ -68,8 +68,9 @@ end module CompactionType MINOR = 1 MAJOR = 2 - VALUE_MAP = {1 => "MINOR", 2 => "MAJOR"} - VALID_VALUES = Set.new([MINOR, MAJOR]).freeze + OPTIMIZE = 3 + VALUE_MAP = {1 => "MINOR", 2 => "MAJOR", 3 => "OPTIMIZE"} + VALID_VALUES = Set.new([MINOR, MAJOR, OPTIMIZE]).freeze end module GrantRevokeType diff --git standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift index 72ccdd1a0f..307ce5c4be 100644 --- standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift +++ standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift @@ -180,6 +180,7 @@ enum LockType { enum CompactionType { MINOR = 1, MAJOR = 2, + OPTIMIZE=3, } enum GrantRevokeType { @@ -2667,4 +2668,5 @@ const string TABLE_TRANSACTIONAL_PROPERTIES = "transactional_properties", const string TABLE_BUCKETING_VERSION = "bucketing_version", const string DRUID_CONFIG_PREFIX = "druid.", const string JDBC_CONFIG_PREFIX = "hive.sql.", +const string TABLE_OPTMIZE_COMPACT_COLUMNS = "optimize_bucket_columns", diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java index b1a92ef03c..25934333d4 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.metastore; import java.io.IOException; +import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -116,6 +117,8 @@ private void handleAlterTableTransactionalProp(PreAlterTableEvent context) throw boolean isTransactionalPropertiesPresent = false; String transactionalPropertiesValue = null; boolean hasValidTransactionalValue = false; + boolean isOptimizeBucketColumns = false; + String optimizedBucketColumnsValue = null; for (String key : keys) { if(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.equalsIgnoreCase(key)) { @@ -129,6 +132,11 @@ private void handleAlterTableTransactionalProp(PreAlterTableEvent context) throw // Do not remove the parameter yet, because we have separate initialization routine // that will use it down below. } + + if(hive_metastoreConstants.TABLE_OPTMIZE_COMPACT_COLUMNS.equalsIgnoreCase(key)) { + isOptimizeBucketColumns = true; + optimizedBucketColumnsValue = parameters.get(key); + } } Table oldTable = context.getOldTable(); String oldTransactionalValue = null; @@ -226,6 +234,24 @@ private void handleAlterTableTransactionalProp(PreAlterTableEvent context) throw t.seedWriteIdOnAcidConversion(new InitializeTableWriteIdsRequest(newTable.getDbName(), newTable.getTableName(), 10000000)); } + + if (isOptimizeBucketColumns) { + List listColumns = Arrays.asList(optimizedBucketColumnsValue.split(",")); + List colList = newTable.getSd().getCols(); + for (String colName : listColumns) { + boolean foundCol = false; + for (FieldSchema mCol : colList) { + if (mCol.getName().equals(colName)) { + foundCol = true; + break; + } + } + if (!foundCol) { + throw new MetaException("Column " + colName + " doesn't exist in table " + + newTable.getTableName() + " in database " + newTable.getDbName()); + } + } + } } private void checkSorted(Table newTable) throws MetaException { diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java index ba45f39452..a24755e234 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java @@ -103,6 +103,10 @@ public boolean isMajorCompaction() { return CompactionType.MAJOR == type; } + public boolean isOptimizeCompaction() { + return CompactionType.OPTIMIZE == type; + } + @Override public int compareTo(CompactionInfo o) { return getFullPartitionName().compareTo(o.getFullPartitionName()); diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index aded6f5486..80d9d4840c 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -279,6 +279,7 @@ public void markCompacted(CompactionInfo info) throws MetaException { switch (rs.getString(5).charAt(0)) { case MAJOR_TYPE: info.type = CompactionType.MAJOR; break; case MINOR_TYPE: info.type = CompactionType.MINOR; break; + case OPTIMIZE_TYPE: info.type = CompactionType.OPTIMIZE; break; default: throw new MetaException("Unexpected compaction type " + rs.getString(5)); } info.runAs = rs.getString(6); diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 1dc3867929..30c62aa905 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -163,6 +163,7 @@ // Compactor types static final protected char MAJOR_TYPE = 'a'; static final protected char MINOR_TYPE = 'i'; + static final protected char OPTIMIZE_TYPE = 'o'; // Transaction states static final protected char TXN_ABORTED = 'a'; @@ -3125,6 +3126,10 @@ public CompactionResponse compact(CompactionRequest rqst) throws MetaException { buf.append(MINOR_TYPE); break; + case OPTIMIZE: + buf.append(OPTIMIZE_TYPE); + break; + default: LOG.debug("Going to rollback"); dbConn.rollback(); @@ -3208,6 +3213,7 @@ public ShowCompactResponse showCompact(ShowCompactRequest rqst) throws MetaExcep switch (rs.getString(5).charAt(0)) { case MAJOR_TYPE: e.setType(CompactionType.MAJOR); break; case MINOR_TYPE: e.setType(CompactionType.MINOR); break; + case OPTIMIZE_TYPE: e.setType(CompactionType.OPTIMIZE); break; default: //do nothing to handle RU/D if we add another status } @@ -5325,6 +5331,8 @@ static CompactionType dbCompactionType2ThriftType(char dbValue) { return CompactionType.MAJOR; case MINOR_TYPE: return CompactionType.MINOR; + case OPTIMIZE_TYPE: + return CompactionType.OPTIMIZE; default: LOG.warn("Unexpected compaction type " + dbValue); return null; @@ -5336,6 +5344,8 @@ static Character thriftCompactionType2DbType(CompactionType ct) { return MAJOR_TYPE; case MINOR: return MINOR_TYPE; + case OPTIMIZE: + return OPTIMIZE_TYPE; default: LOG.warn("Unexpected compaction type " + ct); return null;