diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMmCompactorOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMmCompactorOnTez.java index 074430ce7f..43a216b0cf 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMmCompactorOnTez.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMmCompactorOnTez.java @@ -301,7 +301,6 @@ private void testMmMinorCompactionPartitionedWithBuckets(String fileFormat) thro CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true); CompactorTestUtil.runCleaner(conf); verifySuccessulTxn(1); - List compacts; // Insert test data into test table dataProvider.insertMmTestData(tableName); // Run a compaction @@ -341,7 +340,6 @@ private void testMmMinorCompactionPartitionedWithBuckets(String fileFormat) thro CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true); CompactorTestUtil.runCleaner(conf); verifySuccessulTxn(1); - List compacts; // Verify delta directories after compaction Assert.assertEquals("Delta directories does not match after minor compaction", Collections.singletonList("delta_0000001_0000003_v0000007"), @@ -382,7 +380,6 @@ private void testMmMinorCompactionPartitionedWithBuckets(String fileFormat) thro CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MAJOR, true); CompactorTestUtil.runCleaner(conf); verifySuccessulTxn(1); - List compacts; // Verify base directory after compaction Assert.assertEquals("Base directory does not match after major compaction", Collections.singletonList("base_0000003_v0000007"), diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilder.java new file mode 100644 index 0000000000..79e8cb02c8 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilder.java @@ -0,0 +1,607 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.txn.compactor; + +import com.google.common.base.Preconditions; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.hadoop.hive.common.StatsSetupConst; +import org.apache.hadoop.hive.common.ValidWriteIdList; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Order; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.SkewedInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.ql.ddl.table.create.show.ShowCreateTableOperation; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.util.DirectionUtils; +import org.apache.hadoop.util.StringUtils; +import org.apache.hive.common.util.HiveStringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Builds query strings that help with query-based compaction of CRUD and insert-only tables. + */ +class CompactionQueryBuilder { + + private static final Logger LOG = LoggerFactory.getLogger(CompactionQueryBuilder.class.getName()); + + // required fields, set in constructor + private final Operation operation; + private String resultTableName; + + // required for some types of compaction. Required... + private Table sourceTab; // for Create and for Insert in CRUD and insert-only major + private StorageDescriptor storageDescriptor; // for Create in insert-only + private String location; // for Create + private ValidWriteIdList validWriteIdList; // for Alter/Insert in minor and CRUD + private AcidUtils.Directory dir; // for Alter in minor + private Partition sourcePartition; // for Insert in major and insert-only minor + private String fromTableName; // for Insert + + // settable booleans + private boolean isPartitioned; // for Create + private boolean isBucketed; // for Create in CRUD + private boolean isDeleteDelta; // for Alter in CRUD minor + + // internal use only, for legibility + private final boolean major; + private final boolean minor; + private final boolean crud; + private final boolean insertOnly; + + enum CompactionType { + MAJOR_CRUD, MINOR_CRUD, MAJOR_INSERT_ONLY, MINOR_INSERT_ONLY + } + + enum Operation { + CREATE, ALTER, INSERT, DROP + } + + /** + * Set source table – the table to compact. + * Required for Create operations and for Insert operations in crud and insert-only major + * compaction. + */ + CompactionQueryBuilder setSourceTab(Table sourceTab) { + Preconditions.checkNotNull(sourceTab); + this.sourceTab = sourceTab; + return this; + } + + /** + * Set the StorageDescriptor of the table or partition to compact. + * Required for Create operations in insert-only compaction. + */ + CompactionQueryBuilder setStorageDescriptor(StorageDescriptor storageDescriptor) { + Preconditions.checkNotNull(storageDescriptor); + this.storageDescriptor = storageDescriptor; + return this; + } + + /** + * Set the location of the temp tables. + * Used for Create operations. + */ + CompactionQueryBuilder setLocation(String location) { + this.location = location; + return this; + } + + /** + * Set list of valid write ids. + * Required for Alter and Insert operations in crud minor compaction. + */ + CompactionQueryBuilder setValidWriteIdList(ValidWriteIdList validWriteIdList) { + Preconditions.checkNotNull(validWriteIdList); + this.validWriteIdList = validWriteIdList; + return this; + } + + /** + * Set Acid Directory. + * Required for Alter operations in minor compaction. + */ + CompactionQueryBuilder setDir(AcidUtils.Directory dir) { + Preconditions.checkNotNull(dir); + this.dir = dir; + return this; + } + + /** + * Set partition to compact, if we are compacting a partition. + * Required for Insert operations in major and insert-only minor compaction. + */ + CompactionQueryBuilder setSourcePartition(Partition sourcePartition) { + this.sourcePartition = sourcePartition; + return this; + } + + /** + * Set table to select from. + * Required for Insert operations. + */ + CompactionQueryBuilder setFromTableName(String fromTableName) { + Preconditions.checkNotNull(fromTableName); + this.fromTableName = fromTableName; + return this; + } + + /** + * If true, Create operations will result in a table with partition column `file_name`. + */ + CompactionQueryBuilder setPartitioned(boolean partitioned) { + isPartitioned = partitioned; + return this; + } + + /** + * If true, Create operations for CRUD minor compaction will result in a bucketed table. + */ + CompactionQueryBuilder setBucketed(boolean bucketed) { + isBucketed = bucketed; + return this; + } + + /** + * If true, during CRUD minor compaction, Alter operations will result in the temp table's + * partitions pointing to delete delta directories as opposed to insert deltas' directories (see + * MinorQueryCompactor for details). + */ + CompactionQueryBuilder setIsDeleteDelta(boolean deleteDelta) { + isDeleteDelta = deleteDelta; + return this; + } + + /** + * Construct a CompactionQueryBuilder with required params. + * + * @param compactionType major or minor; crud or insert-only, e.g. CompactionType.MAJOR_CRUD + * @param operation query's Operation e.g. Operation.CREATE + * @param resultTableName the name of the table we are running the operation on + */ + CompactionQueryBuilder(CompactionType compactionType, Operation operation, + String resultTableName) { + Preconditions.checkNotNull(operation, "CompactionQueryBuilder.Operation cannot be null"); + this.operation = operation; + this.resultTableName = resultTableName; + major = compactionType == CompactionType.MAJOR_CRUD + || compactionType == CompactionType.MAJOR_INSERT_ONLY; + crud = + compactionType == CompactionType.MAJOR_CRUD || compactionType == CompactionType.MINOR_CRUD; + minor = !major; + insertOnly = !crud; + } + + /** + * Build the query string based on parameters. + * + * @return query string wrapped in Optional container + */ + Optional buildOptional() { + String result = build(); + if ("".equals(result)) { + return Optional.empty(); + } + return Optional.of(result); + } + + /** + * Build the query string based on parameters. + * + * @return query String + */ + String build() { + StringBuilder query = new StringBuilder(operation.toString()); + + if (operation == Operation.CREATE) { + query.append(" temporary external"); + } + if (operation == Operation.INSERT) { + query.append(" into"); + } + query.append(" table "); + + if (operation == Operation.DROP) { + query.append("if exists "); + } + + query.append(resultTableName); + + switch (operation) { + case CREATE: + getDdlForCreate(query); + break; + case ALTER: + buildAddClauseForAlter(query); + break; + case INSERT: + query.append(" select "); + buildSelectClauseForInsert(query); + query.append(" from ") + .append(fromTableName); + buildWhereClauseForInsert(query); + break; + case DROP: + default: + } + + return query.toString(); + } + + private void buildAddClauseForAlter(StringBuilder query) { + if (validWriteIdList == null || dir == null) { + query.setLength(0); + return; // avoid NPEs, don't throw an exception but return an empty query + } + long minWriteID = + validWriteIdList.getMinOpenWriteId() == null ? 1 : validWriteIdList.getMinOpenWriteId(); + long highWatermark = validWriteIdList.getHighWatermark(); + List deltas = dir.getCurrentDirectories().stream().filter( + delta -> delta.isDeleteDelta() == isDeleteDelta && delta.getMaxWriteId() <= highWatermark + && delta.getMinWriteId() >= minWriteID) + .collect(Collectors.toList()); + if (deltas.isEmpty()) { + query.setLength(0); // no alter query needed; clear StringBuilder + return; + } + query.append(" add "); + deltas.forEach(delta -> query.append("partition (file_name='") + .append(delta.getPath().getName()).append("')" + + " location '").append(delta.getPath()).append("' ")); + } + + + private void buildSelectClauseForInsert(StringBuilder query) { + // Need list of columns for major crud, mmmajor partitioned, mmminor + List cols; + if (major && crud || major && insertOnly && sourcePartition != null || minor && insertOnly) { + if (sourceTab == null) { + return; // avoid NPEs, don't throw an exception but skip this part of the query + } + cols = sourceTab.getSd().getCols(); + } else { + cols = null; + } + + if (crud) { + if (major) { + query.append( + "validate_acid_sort_order(ROW__ID.writeId, ROW__ID.bucketId, ROW__ID.rowId), " + + "ROW__ID.writeId, ROW__ID.bucketId, ROW__ID.rowId, ROW__ID.writeId, " + + "NAMED_STRUCT("); + for (int i = 0; i < cols.size(); ++i) { + query.append(i == 0 ? "'" : ", '").append(cols.get(i).getName()).append("', ") + .append(cols.get(i).getName()); + } + query.append(") "); + } else { //minor + query.append( + "`operation`, `originalTransaction`, `bucket`, `rowId`, `currentTransaction`, `row`"); + } + + } else { // mm + if (major) { + if (sourcePartition != null) { //mmmajor and partitioned + for (int i = 0; i < cols.size(); ++i) { + query.append(i == 0 ? "`" : ", `").append(cols.get(i).getName()).append("`"); + } + } else { // mmmajor and unpartitioned + query.append("*"); + } + } else { // mmminor + for (int i = 0; i < cols.size(); ++i) { + query.append(i == 0 ? "`" : ", `").append(cols.get(i).getName()).append("`"); + } + } + } + } + + private void buildWhereClauseForInsert(StringBuilder query) { + if (major && sourcePartition != null && sourceTab != null) { + List vals = sourcePartition.getValues(); + List keys = sourceTab.getPartitionKeys(); + if (keys.size() != vals.size()) { + throw new IllegalStateException("source partition values (" + + Arrays.toString(vals.toArray()) + ") do not match source table values (" + + Arrays.toString(keys.toArray()) + "). Failing compaction."); + } + + query.append(" where "); + for (int i = 0; i < keys.size(); ++i) { + query.append(i == 0 ? "`" : " and `").append(keys.get(i).getName()).append("`='") + .append(vals.get(i)).append("'"); + } + } + + if (minor && crud && validWriteIdList != null) { + long[] invalidWriteIds = validWriteIdList.getInvalidWriteIds(); + if (invalidWriteIds.length > 0) { + query.append(" where `originalTransaction` not in (").append( + org.apache.commons.lang3.StringUtils.join(ArrayUtils.toObject(invalidWriteIds), ",")) + .append(")"); + } + } + } + + private void getDdlForCreate(StringBuilder query) { + defineColumns(query); + + // PARTITIONED BY. Used for parts of minor compaction. + if (isPartitioned) { + query.append(" PARTITIONED BY (`file_name` STRING) "); + } + + // CLUSTERED BY. (bucketing) + int bucketingVersion = 0; + if (crud && minor) { + bucketingVersion = getMinorCrudBucketing(query, bucketingVersion); + } else if (insertOnly) { + getMmBucketing(query); + } + + // SKEWED BY + if (insertOnly) { + getSkewedByClause(query); + } + + // STORED AS / ROW FORMAT SERDE + INPUTFORMAT + OUTPUTFORMAT + if (crud) { + query.append(" stored as orc"); + } else { + copySerdeFromSourceTable(query); + } + + // LOCATION + if (location != null) { + query.append(" LOCATION '").append(HiveStringUtils.escapeHiveCommand(location)).append("'"); + } + + // TBLPROPERTIES + addTblProperties(query, bucketingVersion); + } + + /** + * Define columns of the create query. + */ + private void defineColumns(StringBuilder query) { + if (sourceTab == null) { + return; // avoid NPEs, don't throw an exception but skip this part of the query + } + query.append("("); + if (crud) { + query.append( + "`operation` int, `originalTransaction` bigint, `bucket` int, `rowId` bigint, " + + "`currentTransaction` bigint, `row` struct<"); + } + List cols = sourceTab.getSd().getCols(); + boolean isFirst = true; + for (FieldSchema col : cols) { + if (!isFirst) { + query.append(", "); + } + isFirst = false; + query.append("`").append(col.getName()).append("` "); + query.append(crud ? ":" : ""); + query.append(col.getType()); + } + query.append(crud ? ">" : ""); + query.append(") "); + } + + /** + * Part of Create operation. Copy source table bucketing for insert-only compaction. + */ + private void getMmBucketing(StringBuilder query) { + if (sourceTab == null) { + return; // avoid NPEs, don't throw an exception but skip this part of the query + } + boolean isFirst; + List buckCols = sourceTab.getSd().getBucketCols(); + if (buckCols.size() > 0) { + query.append("CLUSTERED BY (").append(StringUtils.join(",", buckCols)).append(") "); + List sortCols = sourceTab.getSd().getSortCols(); + if (sortCols.size() > 0) { + query.append("SORTED BY ("); + isFirst = true; + for (Order sortCol : sortCols) { + if (!isFirst) { + query.append(", "); + } + isFirst = false; + query.append(sortCol.getCol()).append(" ") + .append(DirectionUtils.codeToText(sortCol.getOrder())); + } + query.append(") "); + } + query.append("INTO ").append(sourceTab.getSd().getNumBuckets()).append(" BUCKETS"); + } + } + + /** + * Part of Create operation. Minor crud compaction uses its own bucketing system. + */ + private int getMinorCrudBucketing(StringBuilder query, int bucketingVersion) { + if (isBucketed && sourceTab != null) { // skip if sourceTab is null to avoid NPEs + int numBuckets = 1; + try { + org.apache.hadoop.hive.ql.metadata.Table t = + Hive.get().getTable(sourceTab.getDbName(), sourceTab.getTableName()); + numBuckets = Math.max(t.getNumBuckets(), numBuckets); + bucketingVersion = t.getBucketingVersion(); + } catch (HiveException e) { + LOG.info("Error finding table {}. Minor compaction result will use 0 buckets.", + sourceTab.getTableName()); + } finally { + query.append(" clustered by (`bucket`)") + .append(" sorted by (`bucket`, `originalTransaction`, `rowId`)") + .append(" into ").append(numBuckets).append(" buckets"); + } + } + return bucketingVersion; + } + + /** + * Part of Create operation. Insert-only compaction tables copy source tables. + */ + private void getSkewedByClause(StringBuilder query) { + if (sourceTab == null) { + return; // avoid NPEs, don't throw an exception but skip this part of the query + } + boolean isFirst; // Stored as directories. We don't care about the skew otherwise. + if (sourceTab.getSd().isStoredAsSubDirectories()) { + SkewedInfo skewedInfo = sourceTab.getSd().getSkewedInfo(); + if (skewedInfo != null && !skewedInfo.getSkewedColNames().isEmpty()) { + query.append(" SKEWED BY (") + .append(StringUtils.join(", ", skewedInfo.getSkewedColNames())).append(") ON "); + isFirst = true; + for (List colValues : skewedInfo.getSkewedColValues()) { + if (!isFirst) { + query.append(", "); + } + isFirst = false; + query.append("('").append(StringUtils.join("','", colValues)).append("')"); + } + query.append(") STORED AS DIRECTORIES"); + } + } + } + + /** + * Part of Create operation. Insert-only compaction tables copy source tables' serde. + */ + private void copySerdeFromSourceTable(StringBuilder query) { + if (storageDescriptor == null) { + return; // avoid NPEs, don't throw an exception but skip this part of the query + } + ensureTableToCompactIsNative(); + SerDeInfo serdeInfo = storageDescriptor.getSerdeInfo(); + Map serdeParams = serdeInfo.getParameters(); + query.append(" ROW FORMAT SERDE '") + .append(HiveStringUtils.escapeHiveCommand(serdeInfo.getSerializationLib())).append("'"); + // WITH SERDEPROPERTIES + if (!serdeParams.isEmpty()) { + ShowCreateTableOperation.appendSerdeParams(query, serdeParams); + } + query.append("STORED AS INPUTFORMAT '") + .append(HiveStringUtils.escapeHiveCommand(storageDescriptor.getInputFormat())).append("'") + .append(" OUTPUTFORMAT '") + .append(HiveStringUtils.escapeHiveCommand(storageDescriptor.getOutputFormat())) + .append("'"); + } + + /** + * Part of Create operation. All tmp tables are not transactional and are marked as + * compaction tables. Additionally... + * - Crud compaction temp tables need tblproperty, "compactiontable." + * - Minor crud compaction temp tables need bucketing version tblproperty, if table is bucketed. + * - Insert-only compaction tables copy source tables' tblproperties, except metastore/statistics + * properties. + */ + private void addTblProperties(StringBuilder query, int bucketingVersion) { + Map tblProperties = new HashMap<>(); + tblProperties.put("transactional", "false"); + if (crud) { + tblProperties.put(AcidUtils.COMPACTOR_TABLE_PROPERTY, "true"); + } + if (crud && minor && isBucketed) { + tblProperties.put("bucketing_version", String.valueOf(bucketingVersion)); + } + if (insertOnly && sourceTab != null) { // to avoid NPEs, skip this part if sourceTab is null + // Exclude all standard table properties. + Set excludes = getHiveMetastoreConstants(); + excludes.addAll(StatsSetupConst.TABLE_PARAMS_STATS_KEYS); + for (Map.Entry e : sourceTab.getParameters().entrySet()) { + if (e.getValue() == null) { + continue; + } + if (excludes.contains(e.getKey())) { + continue; + } + tblProperties.put(e.getKey(), HiveStringUtils.escapeHiveCommand(e.getValue())); + } + } + + // add TBLPROPERTIES clause to query + boolean isFirst; + query.append(" TBLPROPERTIES ("); + isFirst = true; + for (Map.Entry property : tblProperties.entrySet()) { + if (!isFirst) { + query.append(", "); + } + query.append("'").append(property.getKey()).append("'='").append(property.getValue()) + .append("'"); + isFirst = false; + } + query.append(")"); + } + + private static Set getHiveMetastoreConstants() { + Set result = new HashSet<>(); + for (Field f : hive_metastoreConstants.class.getDeclaredFields()) { + if (!Modifier.isStatic(f.getModifiers())) { + continue; + } + if (!Modifier.isFinal(f.getModifiers())) { + continue; + } + if (!String.class.equals(f.getType())) { + continue; + } + f.setAccessible(true); + try { + result.add((String) f.get(null)); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } + } + return result; + } + + private void ensureTableToCompactIsNative() { + if (sourceTab == null) { + return; + } + String storageHandler = + sourceTab.getParameters().get(hive_metastoreConstants.META_TABLE_STORAGE); + if (storageHandler != null) { + String message = "Table " + sourceTab.getTableName() + "has a storage handler (" + + storageHandler + "). Failing compaction for this non-native table."; + LOG.error(message); + throw new RuntimeException(message); + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java index 9385080713..f47c23a6de 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java @@ -21,14 +21,12 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; -import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import java.io.IOException; import java.util.List; @@ -40,7 +38,7 @@ @Override void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageDescriptor storageDescriptor, - ValidWriteIdList writeIds, CompactionInfo compactionInfo) throws IOException, HiveException { + ValidWriteIdList writeIds, CompactionInfo compactionInfo) throws IOException { AcidUtils .setAcidOperationalProperties(hiveConf, true, AcidUtils.getAcidOperationalProperties(table.getParameters())); @@ -83,57 +81,34 @@ protected void commitCompaction(String dest, String tmpTableName, HiveConf conf, * (current write id will be the same as original write id). * We will be achieving the ordering via a custom split grouper for compactor. * See {@link org.apache.hadoop.hive.conf.HiveConf.ConfVars#SPLIT_GROUPING_MODE} for the config description. - * See {@link org.apache.hadoop.hive.ql.exec.tez.SplitGrouper#getCompactorSplitGroups(InputSplit[], Configuration)} - * for details on the mechanism. */ - private List getCreateQueries(String fullName, Table t, String tmpTableLocation) throws HiveException { - StringBuilder query = new StringBuilder(Util.getCreateTempTableQueryWithAcidColumns(fullName, t)); - org.apache.hadoop.hive.ql.metadata.Table table = Hive.get().getTable(t.getDbName(), t.getTableName(), false); - int numBuckets = 1; - int bucketingVersion = 0; - if (table != null) { - numBuckets = Math.max(table.getNumBuckets(), numBuckets); - bucketingVersion = table.getBucketingVersion(); - } - query.append(" clustered by (`bucket`) into ").append(numBuckets).append(" buckets"); - query.append(" stored as orc"); - query.append(" location '"); - query.append(tmpTableLocation); - query.append("' tblproperties ('transactional'='false',"); - query.append(" 'bucketing_version'='"); - query.append(bucketingVersion); - query.append("','"); - query.append(AcidUtils.COMPACTOR_TABLE_PROPERTY); - query.append("'='true'"); - query.append(")"); - return Lists.newArrayList(query.toString()); + private List getCreateQueries(String fullName, Table t, String tmpTableLocation) { + return Lists.newArrayList(new CompactionQueryBuilder( + CompactionQueryBuilder.CompactionType.MAJOR_CRUD, + CompactionQueryBuilder.Operation.CREATE, + fullName) + .setSourceTab(t) + .setLocation(tmpTableLocation) + .build()); } private List getCompactionQueries(Table t, Partition p, String tmpName) { - String fullName = t.getDbName() + "." + t.getTableName(); - StringBuilder query = new StringBuilder("insert into table " + tmpName + " "); - StringBuilder filter = new StringBuilder(); - if (p != null) { - filter.append(" where "); - List vals = p.getValues(); - List keys = t.getPartitionKeys(); - assert keys.size() == vals.size(); - for (int i = 0; i < keys.size(); ++i) { - filter.append(i == 0 ? "`" : " and `").append(keys.get(i).getName()).append("`='").append(vals.get(i)) - .append("'"); - } - } - query.append(" select validate_acid_sort_order(ROW__ID.writeId, ROW__ID.bucketId, ROW__ID.rowId), ROW__ID.writeId, " - + "ROW__ID.bucketId, ROW__ID.rowId, ROW__ID.writeId, NAMED_STRUCT("); - List cols = t.getSd().getCols(); - for (int i = 0; i < cols.size(); ++i) { - query.append(i == 0 ? "'" : ", '").append(cols.get(i).getName()).append("', ").append(cols.get(i).getName()); - } - query.append(") from ").append(fullName).append(filter); - return Lists.newArrayList(query.toString()); + return Lists.newArrayList( + new CompactionQueryBuilder( + CompactionQueryBuilder.CompactionType.MAJOR_CRUD, + CompactionQueryBuilder.Operation.INSERT, + tmpName) + .setFromTableName(t.getTableName()) + .setSourceTab(t) + .setSourcePartition(p) + .build()); } private List getDropQueries(String tmpTableName) { - return Lists.newArrayList("drop table if exists " + tmpTableName); + return Lists.newArrayList( + new CompactionQueryBuilder( + CompactionQueryBuilder.CompactionType.MAJOR_CRUD, + CompactionQueryBuilder.Operation.DROP, + tmpTableName).build()); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java index 01cd2fc93d..75ac9bc1e2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java @@ -17,8 +17,7 @@ */ package org.apache.hadoop.hive.ql.txn.compactor; -import org.apache.commons.lang3.ArrayUtils; -import org.apache.commons.lang3.StringUtils; +import com.google.common.collect.Lists; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; @@ -28,7 +27,6 @@ import org.apache.hadoop.hive.metastore.txn.CompactionInfo; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; -import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hive.common.util.Ref; import org.slf4j.Logger; @@ -38,7 +36,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Optional; -import java.util.stream.Collectors; /** * Class responsible for handling query based minor compaction. @@ -66,7 +63,7 @@ void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageD table.getDbName() + "_tmp_compactor_" + table.getTableName() + "_" + System.currentTimeMillis(); List createQueries = getCreateQueries(table, tmpTableName, dir, writeIds, conf, storageDescriptor); - List compactionQueries = getCompactionQueries(tmpTableName, writeIds.getInvalidWriteIds()); + List compactionQueries = getCompactionQueries(tmpTableName, table, writeIds); List dropQueries = getDropQueries(tmpTableName); runCompactionQueries(conf, tmpTableName, storageDescriptor, writeIds, compactionInfo, createQueries, @@ -151,40 +148,16 @@ protected void commitCompaction(String dest, String tmpTableName, HiveConf conf, *

*/ private String buildCreateTableQuery(Table table, String newTableName, boolean isPartitioned, - boolean isBucketed, String location) throws HiveException { - StringBuilder query = new StringBuilder(Util.getCreateTempTableQueryWithAcidColumns(newTableName, table)); - if (isPartitioned) { - query.append(" partitioned by (`file_name` string)"); - } - int bucketingVersion = 0; - if (isBucketed) { - int numBuckets = 1; - org.apache.hadoop.hive.ql.metadata.Table t = Hive.get().getTable(table.getDbName(), table.getTableName(), false); - if (t != null) { - numBuckets = Math.max(t.getNumBuckets(), numBuckets); - bucketingVersion = t.getBucketingVersion(); - } - query.append(" clustered by (`bucket`)").append(" sorted by (`bucket`, `originalTransaction`, `rowId`)") - .append(" into ").append(numBuckets).append(" buckets"); - } - query.append(" stored as orc"); - if (location != null && !location.isEmpty()) { - query.append(" location '"); - query.append(location); - query.append("'"); - } - query.append(" tblproperties ('transactional'='false'"); - query.append(", '"); - query.append(AcidUtils.COMPACTOR_TABLE_PROPERTY); - query.append("'='true'"); - if (isBucketed) { - query.append(", 'bucketing_version'='") - .append(bucketingVersion) - .append("')"); - } else { - query.append(")"); - } - return query.toString(); + boolean isBucketed, String location) { + return new CompactionQueryBuilder( + CompactionQueryBuilder.CompactionType.MINOR_CRUD, + CompactionQueryBuilder.Operation.CREATE, + newTableName) + .setSourceTab(table) + .setBucketed(isBucketed) + .setPartitioned(isPartitioned) + .setLocation(location) + .build(); } /** @@ -198,40 +171,32 @@ private String buildCreateTableQuery(Table table, String newTableName, boolean i */ private Optional buildAlterTableQuery(String tableName, AcidUtils.Directory dir, ValidWriteIdList validWriteIdList, boolean isDeleteDelta) { - // add partitions - if (!dir.getCurrentDirectories().isEmpty()) { - long minWriteID = validWriteIdList.getMinOpenWriteId() == null ? 1 : validWriteIdList.getMinOpenWriteId(); - long highWatermark = validWriteIdList.getHighWatermark(); - List deltas = dir.getCurrentDirectories().stream().filter( - delta -> delta.isDeleteDelta() == isDeleteDelta && delta.getMaxWriteId() <= highWatermark - && delta.getMinWriteId() >= minWriteID) - .collect(Collectors.toList()); - if (!deltas.isEmpty()) { - StringBuilder query = new StringBuilder().append("alter table ").append(tableName); - query.append(" add "); - deltas.forEach( - delta -> query.append("partition (file_name='").append(delta.getPath().getName()).append("') location '") - .append(delta.getPath()).append("' ")); - return Optional.of(query.toString()); - } - } - return Optional.empty(); + return new CompactionQueryBuilder( + CompactionQueryBuilder.CompactionType.MINOR_CRUD, + CompactionQueryBuilder.Operation.ALTER, + tableName) + .setDir(dir) + .setValidWriteIdList(validWriteIdList) + .setIsDeleteDelta(isDeleteDelta) + .buildOptional(); } /** * Get a list of compaction queries which fills up the delta/delete-delta temporary result tables. * @param tmpTableBase an unique identifier, which helps to find all the temporary tables - * @param invalidWriteIds list of invalid write IDs. This list is used to filter out aborted/open transactions + * @param table + * @param validWriteIdList list of valid write IDs. This list is used to filter out aborted/open + * transactions * @return list of compaction queries, always non-null */ - private List getCompactionQueries(String tmpTableBase, long[] invalidWriteIds) { + private List getCompactionQueries(String tmpTableBase, Table table, ValidWriteIdList validWriteIdList) { List queries = new ArrayList<>(); String sourceTableName = AcidUtils.DELTA_PREFIX + tmpTableBase; String resultTableName = sourceTableName + "_result"; - queries.add(buildCompactionQuery(sourceTableName, resultTableName, invalidWriteIds)); + queries.add(buildCompactionQuery(sourceTableName, resultTableName, table, validWriteIdList)); String sourceDeleteTableName = AcidUtils.DELETE_DELTA_PREFIX + tmpTableBase; String resultDeleteTableName = sourceDeleteTableName + "_result"; - queries.add(buildCompactionQuery(sourceDeleteTableName, resultDeleteTableName, invalidWriteIds)); + queries.add(buildCompactionQuery(sourceDeleteTableName, resultDeleteTableName, table, validWriteIdList)); return queries; } @@ -240,19 +205,20 @@ private String buildCreateTableQuery(Table table, String newTableName, boolean i * it into the result table, filtering out all rows which belong to open/aborted transactions. * @param sourceTableName the name of the source table * @param resultTableName the name of the result table - * @param invalidWriteIds list of invalid write IDs + * @param table the table to compact + * @param validWriteIdList list of valid write IDs * @return compaction query, always non-null */ - private String buildCompactionQuery(String sourceTableName, String resultTableName, long[] invalidWriteIds) { - StringBuilder query = new StringBuilder().append("insert into table ").append(resultTableName) - .append(" select `operation`, `originalTransaction`, `bucket`, `rowId`, `currentTransaction`, `row` from ") - .append(sourceTableName); - if (invalidWriteIds.length > 0) { - query.append(" where `originalTransaction` not in (") - .append(StringUtils.join(ArrayUtils.toObject(invalidWriteIds), ",")).append(")"); - } - - return query.toString(); + private String buildCompactionQuery(String sourceTableName, String resultTableName, Table table, + ValidWriteIdList validWriteIdList) { + return new CompactionQueryBuilder( + CompactionQueryBuilder.CompactionType.MINOR_CRUD, + CompactionQueryBuilder.Operation.INSERT, + resultTableName) + .setFromTableName(sourceTableName) + .setSourceTab(table) + .setValidWriteIdList(validWriteIdList) + .build(); } /** @@ -261,12 +227,17 @@ private String buildCompactionQuery(String sourceTableName, String resultTableNa * @return list of drop table statements, always non-null */ private List getDropQueries(String tmpTableBase) { - List queries = new ArrayList<>(); - String dropStm = "drop table if exists "; - queries.add(dropStm + AcidUtils.DELTA_PREFIX + tmpTableBase); - queries.add(dropStm + AcidUtils.DELETE_DELTA_PREFIX + tmpTableBase); - queries.add(dropStm + AcidUtils.DELTA_PREFIX + tmpTableBase + "_result"); - queries.add(dropStm + AcidUtils.DELETE_DELTA_PREFIX + tmpTableBase + "_result"); - return queries; + return Lists.newArrayList( + getDropQuery(AcidUtils.DELTA_PREFIX + tmpTableBase), + getDropQuery(AcidUtils.DELETE_DELTA_PREFIX + tmpTableBase), + getDropQuery(AcidUtils.DELTA_PREFIX + tmpTableBase + "_result"), + getDropQuery(AcidUtils.DELETE_DELTA_PREFIX + tmpTableBase + "_result")); + } + + private String getDropQuery(String tableToDrop) { + return new CompactionQueryBuilder( + CompactionQueryBuilder.CompactionType.MINOR_CRUD, + CompactionQueryBuilder.Operation.DROP, + tableToDrop).build(); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java index 41fdd7e210..114b6f7a74 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java @@ -22,7 +22,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; @@ -52,7 +51,7 @@ AcidUtils.Directory dir = AcidUtils .getAcidState(null, new Path(storageDescriptor.getLocation()), hiveConf, writeIds, Ref.from(false), false, table.getParameters(), false); - MmQueryCompactorUtils.removeFilesForMmTable(hiveConf, dir); + QueryCompactor.Util.removeFilesForMmTable(hiveConf, dir); String tmpLocation = Util.generateTmpPath(storageDescriptor); Path baseLocation = new Path(tmpLocation, "_base"); @@ -106,42 +105,36 @@ protected void commitCompaction(String dest, String tmpTableName, HiveConf conf, private List getCreateQueries(String tmpTableName, Table table, StorageDescriptor storageDescriptor, String baseLocation) { - return Lists.newArrayList(MmQueryCompactorUtils - .getCreateQuery(tmpTableName, table, storageDescriptor, baseLocation, false, false)); + return Lists.newArrayList( + new CompactionQueryBuilder( + CompactionQueryBuilder.CompactionType.MAJOR_INSERT_ONLY, + CompactionQueryBuilder.Operation.CREATE, + tmpTableName) + .setSourceTab(table) + .setStorageDescriptor(storageDescriptor) + .setLocation(baseLocation) + .build() + ); } private List getCompactionQueries(Table t, Partition p, String tmpName) { - String fullName = t.getDbName() + "." + t.getTableName(); - // ideally we should make a special form of insert overwrite so that we: - // 1) Could use fast merge path for ORC and RC. - // 2) Didn't have to create a table. - - StringBuilder query = new StringBuilder("insert overwrite table " + tmpName + " "); - StringBuilder filter = new StringBuilder(); - if (p != null) { - filter = new StringBuilder(" where "); - List vals = p.getValues(); - List keys = t.getPartitionKeys(); - assert keys.size() == vals.size(); - for (int i = 0; i < keys.size(); ++i) { - filter.append(i == 0 ? "`" : " and `").append(keys.get(i).getName()).append("`='").append(vals.get(i)) - .append("'"); - } - query.append(" select "); - // Use table descriptor for columns. - List cols = t.getSd().getCols(); - for (int i = 0; i < cols.size(); ++i) { - query.append(i == 0 ? "`" : ", `").append(cols.get(i).getName()).append("`"); - } - } else { - query.append("select *"); - } - query.append(" from ").append(fullName).append(filter); - return Lists.newArrayList(query.toString()); + return Lists.newArrayList( + new CompactionQueryBuilder( + CompactionQueryBuilder.CompactionType.MAJOR_INSERT_ONLY, + CompactionQueryBuilder.Operation.INSERT, + tmpName) + .setSourceTab(t) + .setFromTableName(t.getTableName()) + .setSourcePartition(p) + .build() + ); } private List getDropQueries(String tmpTableName) { - return Lists.newArrayList(MmQueryCompactorUtils.DROP_IF_EXISTS + tmpTableName); + return Lists.newArrayList( + new CompactionQueryBuilder( + CompactionQueryBuilder.CompactionType.MAJOR_INSERT_ONLY, + CompactionQueryBuilder.Operation.DROP, + tmpTableName).build()); } - } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java index feb667cba9..e93612c2cb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java @@ -22,7 +22,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; @@ -39,7 +38,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Optional; -import java.util.stream.Collectors; /** * Run a minor query compaction on an insert only (MM) table. @@ -58,7 +56,7 @@ AcidUtils.Directory dir = AcidUtils .getAcidState(null, new Path(storageDescriptor.getLocation()), hiveConf, writeIds, Ref.from(false), false, table.getParameters(), false); - MmQueryCompactorUtils.removeFilesForMmTable(hiveConf, dir); + QueryCompactor.Util.removeFilesForMmTable(hiveConf, dir); String tmpLocation = Util.generateTmpPath(storageDescriptor); Path sourceTabLocation = new Path(tmpLocation); Path resultTabLocation = new Path(tmpLocation, "_result"); @@ -66,14 +64,15 @@ HiveConf driverConf = setUpDriverSession(hiveConf); String tmpPrefix = table.getDbName() + ".tmp_minor_compactor_" + table.getTableName() + "_"; - String tmpTableBase = tmpPrefix + System.currentTimeMillis(); + String tmpTableName = tmpPrefix + System.currentTimeMillis(); + String resultTmpTableName = tmpTableName + "_result"; List createTableQueries = - getCreateQueries(tmpTableBase, table, partition == null ? table.getSd() : partition.getSd(), + getCreateQueries(tmpTableName, table, partition == null ? table.getSd() : partition.getSd(), sourceTabLocation.toString(), resultTabLocation.toString(), dir, writeIds); - List compactionQueries = getCompactionQueries(tmpTableBase, table.getSd()); - List dropQueries = getDropQueries(tmpTableBase); - runCompactionQueries(driverConf, tmpTableBase, storageDescriptor, writeIds, compactionInfo, + List compactionQueries = getCompactionQueries(tmpTableName, resultTmpTableName, table); + List dropQueries = getDropQueries(tmpTableName); + runCompactionQueries(driverConf, tmpTableName, storageDescriptor, writeIds, compactionInfo, createTableQueries, compactionQueries, dropQueries); } @@ -127,14 +126,25 @@ String sourceTabLocation, String resultTabLocation, AcidUtils.Directory dir, ValidWriteIdList validWriteIdList) { List queries = new ArrayList<>(); - queries.add( - MmQueryCompactorUtils.getCreateQuery(tmpTableBase, t, sd, sourceTabLocation, true, true)); + queries.add(getCreateQuery(tmpTableBase, t, sd, sourceTabLocation, true)); buildAlterTableQuery(tmpTableBase, dir, validWriteIdList).ifPresent(queries::add); - queries.add(MmQueryCompactorUtils - .getCreateQuery(tmpTableBase + "_result", t, sd, resultTabLocation, false, false)); + queries.add(getCreateQuery(tmpTableBase + "_result", t, sd, resultTabLocation, false)); return queries; } + private String getCreateQuery(String resultTableName, Table t, StorageDescriptor sd, + String location, boolean isPartitioned) { + return new CompactionQueryBuilder( + CompactionQueryBuilder.CompactionType.MINOR_INSERT_ONLY, + CompactionQueryBuilder.Operation.CREATE, + resultTableName) + .setSourceTab(t) + .setStorageDescriptor(sd) + .setLocation(location) + .setPartitioned(isPartitioned) + .build(); + } + /** * Builds an alter table query, which adds partitions pointing to location of delta directories. * @@ -145,23 +155,11 @@ */ private Optional buildAlterTableQuery(String tableName, AcidUtils.Directory dir, ValidWriteIdList validWriteIdList) { - if (!dir.getCurrentDirectories().isEmpty()) { - long minWriteID = - validWriteIdList.getMinOpenWriteId() == null ? 1 : validWriteIdList.getMinOpenWriteId(); - long highWatermark = validWriteIdList.getHighWatermark(); - List deltas = dir.getCurrentDirectories().stream().filter( - delta -> delta.getMaxWriteId() <= highWatermark && delta.getMinWriteId() >= minWriteID) - .collect(Collectors.toList()); - if (!deltas.isEmpty()) { - StringBuilder query = new StringBuilder().append("alter table ").append(tableName); - query.append(" add "); - deltas.forEach( - delta -> query.append("partition (file_name='").append(delta.getPath().getName()) - .append("') location '").append(delta.getPath()).append("' ")); - return Optional.of(query.toString()); - } - } - return Optional.empty(); + return new CompactionQueryBuilder(CompactionQueryBuilder.CompactionType.MINOR_INSERT_ONLY, + CompactionQueryBuilder.Operation.ALTER, tableName) + .setDir(dir) + .setValidWriteIdList(validWriteIdList) + .buildOptional(); } /** @@ -171,24 +169,21 @@ *
  • insert into table $tmpTableBase_result select `col_1`, .. from tmpTableBase
  • * * - * @param tmpTableBase an unique identifier, which helps to find all the temporary tables + * @param sourceTmpTableName an unique identifier, which helps to find all the temporary tables + * @param resultTmpTableName * @return list of compaction queries, always non-null */ - private List getCompactionQueries(String tmpTableBase, StorageDescriptor sd) { - String resultTableName = tmpTableBase + "_result"; - StringBuilder query = new StringBuilder().append("insert into table ").append(resultTableName) - .append(" select "); - List cols = sd.getCols(); - boolean isFirst = true; - for (FieldSchema col : cols) { - if (!isFirst) { - query.append(", "); - } - isFirst = false; - query.append("`").append(col.getName()).append("`"); - } - query.append(" from ").append(tmpTableBase); - return Lists.newArrayList(query.toString()); + private List getCompactionQueries(String sourceTmpTableName, String resultTmpTableName, + Table sourceTable) { + return Lists.newArrayList( + new CompactionQueryBuilder( + CompactionQueryBuilder.CompactionType.MINOR_INSERT_ONLY, + CompactionQueryBuilder.Operation.INSERT, + resultTmpTableName) + .setFromTableName(sourceTmpTableName) + .setSourceTab(sourceTable) + .build() + ); } /** @@ -197,8 +192,17 @@ * @return list of drop table statements, always non-null */ private List getDropQueries(String tmpTableBase) { - return Lists.newArrayList(MmQueryCompactorUtils.DROP_IF_EXISTS + tmpTableBase, - MmQueryCompactorUtils.DROP_IF_EXISTS + tmpTableBase + "_result"); + return Lists.newArrayList( + getDropQuery(tmpTableBase), + getDropQuery(tmpTableBase + "_result") + ); + } + + private String getDropQuery(String tableToDrop) { + return new CompactionQueryBuilder( + CompactionQueryBuilder.CompactionType.MINOR_INSERT_ONLY, + CompactionQueryBuilder.Operation.DROP, + tableToDrop).build(); } private HiveConf setUpDriverSession(HiveConf hiveConf) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmQueryCompactorUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmQueryCompactorUtils.java deleted file mode 100644 index 891696dba7..0000000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmQueryCompactorUtils.java +++ /dev/null @@ -1,200 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.txn.compactor; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.StatsSetupConst; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.Order; -import org.apache.hadoop.hive.metastore.api.SerDeInfo; -import org.apache.hadoop.hive.metastore.api.SkewedInfo; -import org.apache.hadoop.hive.metastore.api.StorageDescriptor; -import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; -import org.apache.hadoop.hive.ql.ddl.table.create.show.ShowCreateTableOperation; -import org.apache.hadoop.hive.ql.io.AcidUtils; -import org.apache.hadoop.hive.ql.util.DirectionUtils; -import org.apache.hadoop.util.StringUtils; -import org.apache.hive.common.util.HiveStringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.lang.reflect.Field; -import java.lang.reflect.Modifier; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -final class MmQueryCompactorUtils { - - private static final Logger LOG = LoggerFactory.getLogger(MmQueryCompactorUtils.class.getName()); - static final String DROP_IF_EXISTS = "drop table if exists "; - - private MmQueryCompactorUtils() {} - - /** - * Creates a command to create a new table based on an example table (sourceTab). - * - * @param fullName of new table - * @param sourceTab the table we are modeling the new table on - * @param sd StorageDescriptor of the table or partition we are modeling the new table on - * @param location of the new table - * @param isPartitioned should the new table be partitioned - * @param isExternal should the new table be external - * @return query string creating the new table - */ - static String getCreateQuery(String fullName, Table sourceTab, StorageDescriptor sd, - String location, boolean isPartitioned, boolean isExternal) { - StringBuilder query = new StringBuilder("create temporary "); - if (isExternal) { - query.append("external "); - } - query.append("table ").append(fullName).append("("); - List cols = sourceTab.getSd().getCols(); - boolean isFirst = true; - for (FieldSchema col : cols) { - if (!isFirst) { - query.append(", "); - } - isFirst = false; - query.append("`").append(col.getName()).append("` ").append(col.getType()); - } - query.append(") "); - - // Partitioning. Used for minor compaction. - if (isPartitioned) { - query.append(" PARTITIONED BY (`file_name` STRING) "); - } - - // Bucketing. - List buckCols = sourceTab.getSd().getBucketCols(); - if (buckCols.size() > 0) { - query.append("CLUSTERED BY (").append(StringUtils.join(",", buckCols)).append(") "); - List sortCols = sourceTab.getSd().getSortCols(); - if (sortCols.size() > 0) { - query.append("SORTED BY ("); - isFirst = true; - for (Order sortCol : sortCols) { - if (!isFirst) { - query.append(", "); - } - isFirst = false; - query.append(sortCol.getCol()).append(" ").append(DirectionUtils.codeToText(sortCol.getOrder())); - } - query.append(") "); - } - query.append("INTO ").append(sourceTab.getSd().getNumBuckets()).append(" BUCKETS"); - } - - // Stored as directories. We don't care about the skew otherwise. - if (sourceTab.getSd().isStoredAsSubDirectories()) { - SkewedInfo skewedInfo = sourceTab.getSd().getSkewedInfo(); - if (skewedInfo != null && !skewedInfo.getSkewedColNames().isEmpty()) { - query.append(" SKEWED BY (").append(StringUtils.join(", ", skewedInfo.getSkewedColNames())).append(") ON "); - isFirst = true; - for (List colValues : skewedInfo.getSkewedColValues()) { - if (!isFirst) { - query.append(", "); - } - isFirst = false; - query.append("('").append(StringUtils.join("','", colValues)).append("')"); - } - query.append(") STORED AS DIRECTORIES"); - } - } - - SerDeInfo serdeInfo = sd.getSerdeInfo(); - Map serdeParams = serdeInfo.getParameters(); - query.append(" ROW FORMAT SERDE '").append(HiveStringUtils.escapeHiveCommand(serdeInfo.getSerializationLib())) - .append("'"); - String sh = sourceTab.getParameters().get(hive_metastoreConstants.META_TABLE_STORAGE); - assert sh == null; // Not supposed to be a compactable table. - if (!serdeParams.isEmpty()) { - ShowCreateTableOperation.appendSerdeParams(query, serdeParams); - } - query.append("STORED AS INPUTFORMAT '").append(HiveStringUtils.escapeHiveCommand(sd.getInputFormat())) - .append("' OUTPUTFORMAT '").append(HiveStringUtils.escapeHiveCommand(sd.getOutputFormat())) - .append("' LOCATION '").append(HiveStringUtils.escapeHiveCommand(location)).append("' TBLPROPERTIES ("); - // Exclude all standard table properties. - Set excludes = getHiveMetastoreConstants(); - excludes.addAll(StatsSetupConst.TABLE_PARAMS_STATS_KEYS); - isFirst = true; - for (Map.Entry e : sourceTab.getParameters().entrySet()) { - if (e.getValue() == null) { - continue; - } - if (excludes.contains(e.getKey())) { - continue; - } - if (!isFirst) { - query.append(", "); - } - isFirst = false; - query.append("'").append(e.getKey()).append("'='").append(HiveStringUtils.escapeHiveCommand(e.getValue())) - .append("'"); - } - if (!isFirst) { - query.append(", "); - } - query.append("'transactional'='false')"); - return query.toString(); - - } - - private static Set getHiveMetastoreConstants() { - Set result = new HashSet<>(); - for (Field f : hive_metastoreConstants.class.getDeclaredFields()) { - if (!Modifier.isStatic(f.getModifiers())) { - continue; - } - if (!Modifier.isFinal(f.getModifiers())) { - continue; - } - if (!String.class.equals(f.getType())) { - continue; - } - f.setAccessible(true); - try { - result.add((String) f.get(null)); - } catch (IllegalAccessException e) { - throw new RuntimeException(e); - } - } - return result; - } - - /** - * Remove the delta directories of aborted transactions. - */ - static void removeFilesForMmTable(HiveConf conf, AcidUtils.Directory dir) throws IOException { - List filesToDelete = dir.getAbortedDirectories(); - if (filesToDelete.size() < 1) { - return; - } - LOG.info("About to remove " + filesToDelete.size() + " aborted directories from " + dir); - FileSystem fs = filesToDelete.get(0).getFileSystem(conf); - for (Path dead : filesToDelete) { - LOG.debug("Going to delete path " + dead.toString()); - fs.delete(dead, true); - } - } -} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java index 7a9e48ff1e..7f3ccfa04e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java @@ -23,12 +23,12 @@ import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; -import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; import org.apache.hadoop.hive.ql.DriverUtils; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.session.SessionState; @@ -161,31 +161,6 @@ private static void disableLlapCaching(HiveConf conf) { } } - /** - * Get a create temporary table query string with Orc ACID columns. - * @param tableName name of the new temporary table - * @param table the table where the compaction is running - * @return create query - */ - static String getCreateTempTableQueryWithAcidColumns(String tableName, Table table) { - StringBuilder query = new StringBuilder("create temporary external table ").append(tableName).append(" ("); - // Acid virtual columns - query.append("`operation` int, `originalTransaction` bigint, `bucket` int, `rowId` bigint, `currentTransaction` " - + "bigint, `row` struct<"); - List cols = table.getSd().getCols(); - boolean isFirst = true; - // Actual columns - for (FieldSchema col : cols) { - if (!isFirst) { - query.append(", "); - } - isFirst = false; - query.append("`").append(col.getName()).append("` ").append(":").append(col.getType()); - } - query.append(">)"); - return query.toString(); - } - /** * Remove the root directory of a table if it's empty. * @param conf the Hive configuration @@ -203,5 +178,20 @@ static void cleanupEmptyDir(HiveConf conf, String tmpTableName) throws IOExcepti } } } + /** + * Remove the delta directories of aborted transactions. + */ + static void removeFilesForMmTable(HiveConf conf, AcidUtils.Directory dir) throws IOException { + List filesToDelete = dir.getAbortedDirectories(); + if (filesToDelete.size() < 1) { + return; + } + LOG.info("About to remove " + filesToDelete.size() + " aborted directories from " + dir); + FileSystem fs = filesToDelete.get(0).getFileSystem(conf); + for (Path dead : filesToDelete) { + LOG.debug("Going to delete path " + dead.toString()); + fs.delete(dead, true); + } + } } }