diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
index 7afef0fce6..4efcb79659 100644
--- itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
+++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
@@ -202,6 +202,96 @@ public void testMajorCompaction() throws Exception {
executeStatementOnDriver("drop table " + tblName, driver);
}
+ @Test
+ public void testOptimizeCompaction() throws Exception {
+ String dbName = "default";
+ String tblName = "testOptimizeCompaction";
+ executeStatementOnDriver("drop table if exists " + tblName, driver);
+ executeStatementOnDriver("create transactional table " + tblName + " (a int, b int) clustered by (a) into 2 buckets"
+ + " stored as ORC TBLPROPERTIES('bucketing_version'='2', 'transactional'='true',"
+ + " 'transactional_properties'='default')", driver);
+ executeStatementOnDriver("insert into " + tblName + " values(1,2),(1,3),(1,4),(2,2),(2,3),(2,4)", driver);
+ executeStatementOnDriver("insert into " + tblName + " values(3,2),(3,3),(3,4),(4,2),(4,3),(4,4)", driver);
+ executeStatementOnDriver("delete from " + tblName + " where b = 2", driver);
+ // Find the location of the table
+ IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
+ Table table = msClient.getTable(dbName, tblName);
+ FileSystem fs = FileSystem.get(conf);
+ // Verify deltas (delta_0000001_0000001_0000, delta_0000002_0000002_0000) are present
+ FileStatus[] filestatus = fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.deltaFileFilter);
+ String[] deltas = new String[filestatus.length];
+ for (int i = 0; i < deltas.length; i++) {
+ deltas[i] = filestatus[i].getPath().getName();
+ }
+ Arrays.sort(deltas);
+ String[] expectedDeltas = new String[] { "delta_0000001_0000001_0000", "delta_0000002_0000002_0000" };
+ if (!Arrays.deepEquals(expectedDeltas, deltas)) {
+ Assert.fail("Expected: " + Arrays.toString(expectedDeltas) + ", found: " + Arrays.toString(deltas));
+ }
+ // Verify that delete delta (delete_delta_0000003_0000003_0000) is present
+ FileStatus[] deleteDeltaStat = fs.listStatus(new Path(table.getSd().getLocation()),
+ AcidUtils.deleteEventDeltaDirFilter);
+ String[] deleteDeltas = new String[deleteDeltaStat.length];
+ for (int i = 0; i < deleteDeltas.length; i++) {
+ deleteDeltas[i] = deleteDeltaStat[i].getPath().getName();
+ }
+ Arrays.sort(deleteDeltas);
+ String[] expectedDeleteDeltas = new String[] { "delete_delta_0000003_0000003_0000" };
+ if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) {
+ Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas));
+ }
+ List expectedRsBucket0 = new ArrayList<>();
+ expectedRsBucket0.add("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t2\t3");
+ expectedRsBucket0.add("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t2\t4");
+ expectedRsBucket0.add("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t3\t3");
+ expectedRsBucket0.add("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":2}\t3\t4");
+ List expectedRsBucket1 = new ArrayList<>();
+ expectedRsBucket1.add("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t1\t3");
+ expectedRsBucket1.add("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":2}\t1\t4");
+ expectedRsBucket1.add("{\"writeid\":2,\"bucketid\":536936448,\"rowid\":1}\t4\t3");
+ expectedRsBucket1.add("{\"writeid\":2,\"bucketid\":536936448,\"rowid\":2}\t4\t4");
+ // Bucket 0
+ List rsBucket0 = executeStatementOnDriverAndReturnResults("select ROW__ID, * from " + tblName
+ + " where ROW__ID.bucketid = 536870912 order by ROW__ID", driver);
+ Assert.assertEquals("normal read", expectedRsBucket0, rsBucket0);
+ // Bucket 1
+ List rsBucket1 = executeStatementOnDriverAndReturnResults("select ROW__ID, * from " + tblName
+ + " where ROW__ID.bucketid = 536936448 order by ROW__ID", driver);
+ Assert.assertEquals("normal read", expectedRsBucket1, rsBucket1);
+ // Run major compaction and cleaner
+ CompactorTestUtil.runCompaction(conf, dbName, tblName, CompactionType.OPTIMIZE, true);
+ CompactorTestUtil.runCleaner(conf);
+ // Should contain only one base directory now
+ filestatus = fs.listStatus(new Path(table.getSd().getLocation()));
+ String[] bases = new String[filestatus.length];
+ for (int i = 0; i < bases.length; i++) {
+ bases[i] = filestatus[i].getPath().getName();
+ }
+
+ Arrays.sort(bases);
+ String[] expectedBases = new String[] { "base_0000003" };
+
+ List expectedRsCompact = new ArrayList<>();
+ expectedRsCompact.add("2\t3");
+ expectedRsCompact.add("2\t4");
+ expectedRsCompact.add("3\t3");
+ expectedRsCompact.add("3\t4");
+ expectedRsCompact.add("1\t3");
+ expectedRsCompact.add("1\t4");
+ expectedRsCompact.add("4\t3");
+ expectedRsCompact.add("4\t4");
+
+ if (!Arrays.deepEquals(expectedBases, bases)) {
+ Assert.fail("Expected: " + Arrays.toString(expectedBases) + ", found: " + Arrays.toString(bases));
+ }
+
+ List rsCompact = executeStatementOnDriverAndReturnResults("select * from " + tblName, driver);
+ Assert.assertEquals("compacted read", expectedRsCompact, rsCompact);
+
+ // Clean up
+ executeStatementOnDriver("drop table " + tblName, driver);
+ }
+
@Test
public void testMinorCompactionNotPartitionedWithoutBuckets() throws Exception {
String dbName = "default";
diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
index 274da31317..5f5ab4cf9b 100644
--- ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
+++ ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
@@ -1086,7 +1086,7 @@ private void analyzeAlterTableCompact(ASTNode ast, TableName tableName,
String type = unescapeSQLString(ast.getChild(0).getText()).toLowerCase();
- if (!type.equals("minor") && !type.equals("major")) {
+ if (!type.equals("minor") && !type.equals("major") && !type.equals("optimize")) {
throw new SemanticException(ErrorMsg.INVALID_COMPACTION_TYPE.getMsg());
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/OptimizeQueryCompactor.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/OptimizeQueryCompactor.java
new file mode 100644
index 0000000000..8866e8ee5d
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/OptimizeQueryCompactor.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.SkewedInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.ql.ddl.table.create.show.ShowCreateTableOperation;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.util.DirectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hive.common.util.HiveStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Class responsible to run query based major compaction on insert only tables.
+ */
+final class OptimizeQueryCompactor extends QueryCompactor {
+
+ private static final Logger LOG = LoggerFactory.getLogger(OptimizeQueryCompactor.class.getName());
+
+ @Override void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageDescriptor storageDescriptor,
+ ValidWriteIdList writeIds, CompactionInfo compactionInfo) throws IOException {
+
+ String tmpLocation = Util.generateTmpPath(storageDescriptor);
+ Path baseLocation = new Path(tmpLocation, "_base");
+
+ // Set up the session for driver.
+ HiveConf driverConf = new HiveConf(hiveConf);
+ driverConf.set(HiveConf.ConfVars.HIVE_QUOTEDID_SUPPORT.varname, "column");
+
+ // Note: we could skip creating the table and just add table type stuff directly to the
+ // "insert overwrite directory" command if there were no bucketing or list bucketing.
+ String tmpPrefix = table.getDbName() + ".tmp_compactor_" + table.getTableName() + "_";
+ String tmpTableName = tmpPrefix + System.currentTimeMillis();
+ List createTableQueries =
+ getCreateQueries(tmpTableName, table, partition == null ? table.getSd() : partition.getSd(),
+ baseLocation.toString());
+ List compactionQueries = getCompactionQueries(table, partition, tmpTableName);
+ List dropQueries = getDropQueries(tmpTableName);
+ runCompactionQueries(driverConf, tmpTableName, storageDescriptor, writeIds, compactionInfo,
+ createTableQueries, compactionQueries, dropQueries);
+ }
+
+ /**
+ * Note: similar logic to the main committer; however, no ORC versions and stuff like that.
+ * @param dest The final directory; basically a SD directory. Not the actual base/delta.
+ * @param compactorTxnId txn that the compactor started
+ */
+ @Override
+ protected void commitCompaction(String dest, String tmpTableName, HiveConf conf,
+ ValidWriteIdList actualWriteIds, long compactorTxnId) throws IOException, HiveException {
+ org.apache.hadoop.hive.ql.metadata.Table tempTable = Hive.get().getTable(tmpTableName);
+ String from = tempTable.getSd().getLocation();
+ Path fromPath = new Path(from), toPath = new Path(dest);
+ FileSystem fs = fromPath.getFileSystem(conf);
+ // Assume the high watermark can be used as maximum transaction ID.
+ //todo: is that true? can it be aborted? does it matter for compaction? probably OK since
+ //getAcidState() doesn't check if X is valid in base_X_vY for compacted base dirs.
+ long maxTxn = actualWriteIds.getHighWatermark();
+ AcidOutputFormat.Options options =
+ new AcidOutputFormat.Options(conf).writingBase(true).isCompressed(false).maximumWriteId(maxTxn).bucket(0)
+ .statementId(-1);
+ Path newBaseDir = AcidUtils.createFilename(toPath, options).getParent();
+ if (!fs.exists(fromPath)) {
+ LOG.info(from + " not found. Assuming 0 splits. Creating " + newBaseDir);
+ fs.mkdirs(newBaseDir);
+ return;
+ }
+ LOG.info("Moving contents of " + from + " to " + dest);
+ fs.rename(fromPath, newBaseDir);
+ fs.delete(fromPath.getParent(), true);
+ }
+
+ private List getCreateQueries(String fullName, Table t, StorageDescriptor sd, String location) {
+ StringBuilder query = new StringBuilder("create temporary table ").append(fullName).append("(");
+ List cols = t.getSd().getCols();
+ boolean isFirst = true;
+ for (FieldSchema col : cols) {
+ if (!isFirst) {
+ query.append(", ");
+ }
+ isFirst = false;
+ query.append("`").append(col.getName()).append("` ").append(col.getType());
+ }
+ query.append(") ");
+
+ // Bucketing.
+ List buckCols = t.getSd().getBucketCols();
+ if (buckCols.size() > 0) {
+ query.append("CLUSTERED BY (").append(StringUtils.join(",", buckCols)).append(") ");
+ List sortCols = t.getSd().getSortCols();
+ if (sortCols.size() > 0) {
+ query.append("SORTED BY (");
+ isFirst = true;
+ for (Order sortCol : sortCols) {
+ if (!isFirst) {
+ query.append(", ");
+ }
+ isFirst = false;
+ query.append(sortCol.getCol()).append(" ").append(DirectionUtils.codeToText(sortCol.getOrder()));
+ }
+ query.append(") ");
+ }
+ query.append("INTO ").append(t.getSd().getNumBuckets()).append(" BUCKETS");
+ }
+
+ // Stored as directories. We don't care about the skew otherwise.
+ if (t.getSd().isStoredAsSubDirectories()) {
+ SkewedInfo skewedInfo = t.getSd().getSkewedInfo();
+ if (skewedInfo != null && !skewedInfo.getSkewedColNames().isEmpty()) {
+ query.append(" SKEWED BY (").append(StringUtils.join(", ", skewedInfo.getSkewedColNames())).append(") ON ");
+ isFirst = true;
+ for (List colValues : skewedInfo.getSkewedColValues()) {
+ if (!isFirst) {
+ query.append(", ");
+ }
+ isFirst = false;
+ query.append("('").append(StringUtils.join("','", colValues)).append("')");
+ }
+ query.append(") STORED AS DIRECTORIES");
+ }
+ }
+
+ SerDeInfo serdeInfo = sd.getSerdeInfo();
+ Map serdeParams = serdeInfo.getParameters();
+ query.append(" ROW FORMAT SERDE '").append(HiveStringUtils.escapeHiveCommand(serdeInfo.getSerializationLib()))
+ .append("'");
+ String sh = t.getParameters().get(hive_metastoreConstants.META_TABLE_STORAGE);
+ assert sh == null; // Not supposed to be a compactable table.
+ if (!serdeParams.isEmpty()) {
+ ShowCreateTableOperation.appendSerdeParams(query, serdeParams);
+ }
+ query.append("STORED AS INPUTFORMAT '").append(HiveStringUtils.escapeHiveCommand(sd.getInputFormat()))
+ .append("' OUTPUTFORMAT '").append(HiveStringUtils.escapeHiveCommand(sd.getOutputFormat()))
+ .append("' LOCATION '").append(HiveStringUtils.escapeHiveCommand(location)).append("' TBLPROPERTIES (");
+ // Exclude all standard table properties.
+ Set excludes = getHiveMetastoreConstants();
+ excludes.addAll(StatsSetupConst.TABLE_PARAMS_STATS_KEYS);
+ isFirst = true;
+ for (Map.Entry e : t.getParameters().entrySet()) {
+ if (e.getValue() == null) {
+ continue;
+ }
+ if (excludes.contains(e.getKey())) {
+ continue;
+ }
+ if (!isFirst) {
+ query.append(", ");
+ }
+ isFirst = false;
+ query.append("'").append(e.getKey()).append("'='").append(HiveStringUtils.escapeHiveCommand(e.getValue()))
+ .append("'");
+ }
+ if (!isFirst) {
+ query.append(", ");
+ }
+ query.append("'transactional'='false')");
+ return Lists.newArrayList(query.toString());
+
+ }
+
+ private List getCompactionQueries(Table t, Partition p, String tmpName) {
+ String fullName = t.getDbName() + "." + t.getTableName();
+ // ideally we should make a special form of insert overwrite so that we:
+ // 1) Could use fast merge path for ORC and RC.
+ // 2) Didn't have to create a table.
+
+ StringBuilder query = new StringBuilder("insert overwrite table " + tmpName + " ");
+ StringBuilder filter = new StringBuilder();
+ if (p != null) {
+ filter = new StringBuilder(" where ");
+ List vals = p.getValues();
+ List keys = t.getPartitionKeys();
+ assert keys.size() == vals.size();
+ for (int i = 0; i < keys.size(); ++i) {
+ filter.append(i == 0 ? "`" : " and `").append(keys.get(i).getName()).append("`='").append(vals.get(i))
+ .append("'");
+ }
+ query.append(" select ");
+ // Use table descriptor for columns.
+ List cols = t.getSd().getCols();
+ for (int i = 0; i < cols.size(); ++i) {
+ query.append(i == 0 ? "`" : ", `").append(cols.get(i).getName()).append("`");
+ }
+ } else {
+ query.append("select *");
+ }
+ query.append(" from ").append(fullName).append(filter);
+
+ String optimizeColumns = t.getParameters().get(hive_metastoreConstants.TABLE_OPTMIZE_COMPACT_COLUMNS);
+
+ if (optimizeColumns != null) {
+ query.append(" CLUSTER BY ").append(optimizeColumns);
+ }
+ return Lists.newArrayList(query.toString());
+ }
+
+ private List getDropQueries(String tmpTableName) {
+ return Lists.newArrayList("drop table if exists " + tmpTableName);
+ }
+
+ private static Set getHiveMetastoreConstants() {
+ Set result = new HashSet<>();
+ for (Field f : hive_metastoreConstants.class.getDeclaredFields()) {
+ if (!Modifier.isStatic(f.getModifiers())) {
+ continue;
+ }
+ if (!Modifier.isFinal(f.getModifiers())) {
+ continue;
+ }
+ if (!String.class.equals(f.getType())) {
+ continue;
+ }
+ f.setAccessible(true);
+ try {
+ result.add((String) f.get(null));
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return result;
+ }
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java
index 2f2bb21a13..a61e88e52a 100644
--- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java
+++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java
@@ -42,6 +42,9 @@ private QueryCompactorFactory() {
*
* {@link MmMajorQueryCompactor} - handles query based minor compaction for micro-managed tables
*
+ *
+ * {@link OptimizeQueryCompactor} - handles query based minor compaction for micro-managed tables
+ *
*
* @param table the table, on which the compaction should be running, must be not null.
* @param configuration the hive configuration, must be not null.
@@ -53,7 +56,7 @@ static QueryCompactor getQueryCompactor(Table table, HiveConf configuration, Com
.getBoolVar(configuration, HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED)) {
if (compactionInfo.isMajorCompaction()) {
return new MajorQueryCompactor();
- } else if (!compactionInfo.isMajorCompaction() && "tez"
+ } else if (!compactionInfo.isMajorCompaction() && !compactionInfo.isOptimizeCompaction() && "tez"
.equalsIgnoreCase(HiveConf.getVar(configuration, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE))) {
// query based minor compaction is only supported on tez
return new MinorQueryCompactor();
@@ -65,6 +68,10 @@ static QueryCompactor getQueryCompactor(Table table, HiveConf configuration, Com
return new MmMajorQueryCompactor();
}
+ if (compactionInfo.isOptimizeCompaction()) {
+ return new OptimizeQueryCompactor();
+ }
+
return null;
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index 5aff71e0e9..78c10593df 100644
--- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -24,7 +24,13 @@
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.LockComponentBuilder;
+import org.apache.hadoop.hive.metastore.LockRequestBuilder;
import org.apache.hadoop.hive.metastore.MetaStoreThread;
+import org.apache.hadoop.hive.metastore.api.DataOperationType;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockState;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
@@ -166,6 +172,23 @@ public void run() {
* multi-stmt txn. {@link Driver#setCompactionWriteIds(ValidWriteIdList, long)} */
long compactorTxnId = msc.openTxn(ci.runAs, TxnType.COMPACTION);
+ // If we have a compaction of type CompactionType.OPTIMIZE we take a semi-shared lock
+ if (ci.isOptimizeCompaction()) {
+ LockRequest lockRequest = createLockRequest(t, ci.getFullPartitionName(), ci.runAs, compactorTxnId, getName());
+ try {
+ LockResponse res = msc.lock(lockRequest);
+ if (res.getState() != LockState.ACQUIRED) {
+ LOG.error("Unable to acquire lock on " + ci.getFullPartitionName() + "for performing a optimized compaction");
+ msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci));
+ continue;
+ }
+ } catch (TException e) {
+ LOG.error("Unable to acquire lock on " + ci.getFullPartitionName() + "for performing a optimized compaction", e);
+ msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci));
+ continue;
+ }
+ }
+
heartbeater = new CompactionHeartbeater(compactorTxnId, fullTableName, conf);
heartbeater.start();
@@ -270,6 +293,26 @@ public Object run() throws Exception {
} while (!stop.get());
}
+ static LockRequest createLockRequest(Table tbl, String partNameForLock, String user, long txnId, String agentInfo) {
+ LockRequestBuilder requestBuilder = new LockRequestBuilder(agentInfo);
+ requestBuilder.setUser(user);
+ requestBuilder.setTransactionId(txnId);
+
+ LockComponentBuilder lockCompBuilder = new LockComponentBuilder()
+ .setDbName(tbl.getDbName())
+ .setTableName(tbl.getTableName())
+ .setSemiShared()
+ .setIsDynamicPartitionWrite(false)
+ .setOperationType(DataOperationType.NO_TXN)
+ .setIsDynamicPartitionWrite(true);
+
+ if (partNameForLock != null && !partNameForLock.isEmpty()) {
+ lockCompBuilder.setPartitionName(partNameForLock);
+ }
+ requestBuilder.addLockComponent(lockCompBuilder.build());
+ return requestBuilder.build();
+ }
+
@Override
public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception {
super.init(stop, looped);
diff --git standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionType.java standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionType.java
index 7450b27cf3..3a5265753d 100644
--- standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionType.java
+++ standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionType.java
@@ -13,7 +13,8 @@
public enum CompactionType implements org.apache.thrift.TEnum {
MINOR(1),
- MAJOR(2);
+ MAJOR(2),
+ OPTIMIZE(3);
private final int value;
@@ -38,6 +39,8 @@ public static CompactionType findByValue(int value) {
return MINOR;
case 2:
return MAJOR;
+ case 3:
+ return OPTIMIZE;
default:
return null;
}
diff --git standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java
index e450d362d8..ed74d77cee 100644
--- standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java
+++ standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java
@@ -98,4 +98,6 @@
public static final String JDBC_CONFIG_PREFIX = "hive.sql.";
+ public static final String TABLE_OPTMIZE_COMPACT_COLUMNS = "optimize_bucket_columns";
+
}
diff --git standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php
index 093ad4be27..644a33a6e1 100644
--- standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php
+++ standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php
@@ -99,9 +99,11 @@ final class LockType {
final class CompactionType {
const MINOR = 1;
const MAJOR = 2;
+ const OPTIMIZE = 3;
static public $__names = array(
1 => 'MINOR',
2 => 'MAJOR',
+ 3 => 'OPTIMIZE',
);
}
@@ -38735,6 +38737,7 @@ final class Constant extends \Thrift\Type\TConstant {
static protected $TABLE_BUCKETING_VERSION;
static protected $DRUID_CONFIG_PREFIX;
static protected $JDBC_CONFIG_PREFIX;
+ static protected $TABLE_OPTMIZE_COMPACT_COLUMNS;
static protected function init_DDL_TIME() {
return "transient_lastDdlTime";
@@ -38859,6 +38862,10 @@ final class Constant extends \Thrift\Type\TConstant {
static protected function init_JDBC_CONFIG_PREFIX() {
return "hive.sql.";
}
+
+ static protected function init_TABLE_OPTMIZE_COMPACT_COLUMNS() {
+ return "optimize_bucket_columns";
+ }
}
diff --git standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/constants.py standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/constants.py
index 61d68929ab..b4bb029214 100644
--- standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/constants.py
+++ standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/constants.py
@@ -40,3 +40,4 @@
TABLE_BUCKETING_VERSION = "bucketing_version"
DRUID_CONFIG_PREFIX = "druid."
JDBC_CONFIG_PREFIX = "hive.sql."
+TABLE_OPTMIZE_COMPACT_COLUMNS = "optimize_bucket_columns"
diff --git standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
index 0dcca59b68..163f99e5da 100644
--- standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
+++ standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
@@ -143,15 +143,18 @@ class LockType:
class CompactionType:
MINOR = 1
MAJOR = 2
+ OPTIMIZE = 3
_VALUES_TO_NAMES = {
1: "MINOR",
2: "MAJOR",
+ 3: "OPTIMIZE",
}
_NAMES_TO_VALUES = {
"MINOR": 1,
"MAJOR": 2,
+ "OPTIMIZE": 3,
}
class GrantRevokeType:
diff --git standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_constants.rb standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_constants.rb
index 2b117280f8..8d29843940 100644
--- standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_constants.rb
+++ standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_constants.rb
@@ -69,3 +69,5 @@ DRUID_CONFIG_PREFIX = %q"druid."
JDBC_CONFIG_PREFIX = %q"hive.sql."
+TABLE_OPTMIZE_COMPACT_COLUMNS = %q"optimize_bucket_columns"
+
diff --git standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
index 8d7c32a765..eb897f3bf8 100644
--- standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
+++ standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
@@ -68,8 +68,9 @@ end
module CompactionType
MINOR = 1
MAJOR = 2
- VALUE_MAP = {1 => "MINOR", 2 => "MAJOR"}
- VALID_VALUES = Set.new([MINOR, MAJOR]).freeze
+ OPTIMIZE = 3
+ VALUE_MAP = {1 => "MINOR", 2 => "MAJOR", 3 => "OPTIMIZE"}
+ VALID_VALUES = Set.new([MINOR, MAJOR, OPTIMIZE]).freeze
end
module GrantRevokeType
diff --git standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
index 72ccdd1a0f..307ce5c4be 100644
--- standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
+++ standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
@@ -180,6 +180,7 @@ enum LockType {
enum CompactionType {
MINOR = 1,
MAJOR = 2,
+ OPTIMIZE=3,
}
enum GrantRevokeType {
@@ -2667,4 +2668,5 @@ const string TABLE_TRANSACTIONAL_PROPERTIES = "transactional_properties",
const string TABLE_BUCKETING_VERSION = "bucketing_version",
const string DRUID_CONFIG_PREFIX = "druid.",
const string JDBC_CONFIG_PREFIX = "hive.sql.",
+const string TABLE_OPTMIZE_COMPACT_COLUMNS = "optimize_bucket_columns",
diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
index b1a92ef03c..25934333d4 100644
--- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
+++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.metastore;
import java.io.IOException;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -116,6 +117,8 @@ private void handleAlterTableTransactionalProp(PreAlterTableEvent context) throw
boolean isTransactionalPropertiesPresent = false;
String transactionalPropertiesValue = null;
boolean hasValidTransactionalValue = false;
+ boolean isOptimizeBucketColumns = false;
+ String optimizedBucketColumnsValue = null;
for (String key : keys) {
if(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.equalsIgnoreCase(key)) {
@@ -129,6 +132,11 @@ private void handleAlterTableTransactionalProp(PreAlterTableEvent context) throw
// Do not remove the parameter yet, because we have separate initialization routine
// that will use it down below.
}
+
+ if(hive_metastoreConstants.TABLE_OPTMIZE_COMPACT_COLUMNS.equalsIgnoreCase(key)) {
+ isOptimizeBucketColumns = true;
+ optimizedBucketColumnsValue = parameters.get(key);
+ }
}
Table oldTable = context.getOldTable();
String oldTransactionalValue = null;
@@ -226,6 +234,24 @@ private void handleAlterTableTransactionalProp(PreAlterTableEvent context) throw
t.seedWriteIdOnAcidConversion(new InitializeTableWriteIdsRequest(newTable.getDbName(),
newTable.getTableName(), 10000000));
}
+
+ if (isOptimizeBucketColumns) {
+ List listColumns = Arrays.asList(optimizedBucketColumnsValue.split(","));
+ List colList = newTable.getSd().getCols();
+ for (String colName : listColumns) {
+ boolean foundCol = false;
+ for (FieldSchema mCol : colList) {
+ if (mCol.getName().equals(colName)) {
+ foundCol = true;
+ break;
+ }
+ }
+ if (!foundCol) {
+ throw new MetaException("Column " + colName + " doesn't exist in table "
+ + newTable.getTableName() + " in database " + newTable.getDbName());
+ }
+ }
+ }
}
private void checkSorted(Table newTable) throws MetaException {
diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
index ba45f39452..a24755e234 100644
--- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
+++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
@@ -103,6 +103,10 @@ public boolean isMajorCompaction() {
return CompactionType.MAJOR == type;
}
+ public boolean isOptimizeCompaction() {
+ return CompactionType.OPTIMIZE == type;
+ }
+
@Override
public int compareTo(CompactionInfo o) {
return getFullPartitionName().compareTo(o.getFullPartitionName());
diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index aded6f5486..80d9d4840c 100644
--- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -279,6 +279,7 @@ public void markCompacted(CompactionInfo info) throws MetaException {
switch (rs.getString(5).charAt(0)) {
case MAJOR_TYPE: info.type = CompactionType.MAJOR; break;
case MINOR_TYPE: info.type = CompactionType.MINOR; break;
+ case OPTIMIZE_TYPE: info.type = CompactionType.OPTIMIZE; break;
default: throw new MetaException("Unexpected compaction type " + rs.getString(5));
}
info.runAs = rs.getString(6);
diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 1dc3867929..30c62aa905 100644
--- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -163,6 +163,7 @@
// Compactor types
static final protected char MAJOR_TYPE = 'a';
static final protected char MINOR_TYPE = 'i';
+ static final protected char OPTIMIZE_TYPE = 'o';
// Transaction states
static final protected char TXN_ABORTED = 'a';
@@ -3125,6 +3126,10 @@ public CompactionResponse compact(CompactionRequest rqst) throws MetaException {
buf.append(MINOR_TYPE);
break;
+ case OPTIMIZE:
+ buf.append(OPTIMIZE_TYPE);
+ break;
+
default:
LOG.debug("Going to rollback");
dbConn.rollback();
@@ -3208,6 +3213,7 @@ public ShowCompactResponse showCompact(ShowCompactRequest rqst) throws MetaExcep
switch (rs.getString(5).charAt(0)) {
case MAJOR_TYPE: e.setType(CompactionType.MAJOR); break;
case MINOR_TYPE: e.setType(CompactionType.MINOR); break;
+ case OPTIMIZE_TYPE: e.setType(CompactionType.OPTIMIZE); break;
default:
//do nothing to handle RU/D if we add another status
}
@@ -5325,6 +5331,8 @@ static CompactionType dbCompactionType2ThriftType(char dbValue) {
return CompactionType.MAJOR;
case MINOR_TYPE:
return CompactionType.MINOR;
+ case OPTIMIZE_TYPE:
+ return CompactionType.OPTIMIZE;
default:
LOG.warn("Unexpected compaction type " + dbValue);
return null;
@@ -5336,6 +5344,8 @@ static Character thriftCompactionType2DbType(CompactionType ct) {
return MAJOR_TYPE;
case MINOR:
return MINOR_TYPE;
+ case OPTIMIZE:
+ return OPTIMIZE_TYPE;
default:
LOG.warn("Unexpected compaction type " + ct);
return null;