diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java index 41fdd7e210..c657e4e778 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.ql.txn.compactor; import com.google.common.collect.Lists; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; @@ -29,7 +28,6 @@ import org.apache.hadoop.hive.metastore.txn.CompactionInfo; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; -import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hive.common.util.Ref; import org.slf4j.Logger; @@ -54,9 +52,6 @@ table.getParameters(), false); MmQueryCompactorUtils.removeFilesForMmTable(hiveConf, dir); - String tmpLocation = Util.generateTmpPath(storageDescriptor); - Path baseLocation = new Path(tmpLocation, "_base"); - // Set up the session for driver. HiveConf driverConf = new HiveConf(hiveConf); driverConf.set(HiveConf.ConfVars.HIVE_QUOTEDID_SUPPORT.varname, "column"); @@ -65,9 +60,8 @@ // "insert overwrite directory" command if there were no bucketing or list bucketing. String tmpPrefix = table.getDbName() + ".tmp_compactor_" + table.getTableName() + "_"; String tmpTableName = tmpPrefix + System.currentTimeMillis(); - List createTableQueries = - getCreateQueries(tmpTableName, table, partition == null ? table.getSd() : partition.getSd(), - baseLocation.toString()); + List createTableQueries = getCreateQueries(tmpTableName, table, + partition == null ? table.getSd() : partition.getSd(), writeIds, driverConf); List compactionQueries = getCompactionQueries(table, partition, tmpTableName); List dropQueries = getDropQueries(tmpTableName); runCompactionQueries(driverConf, tmpTableName, storageDescriptor, writeIds, compactionInfo, @@ -82,32 +76,23 @@ @Override protected void commitCompaction(String dest, String tmpTableName, HiveConf conf, ValidWriteIdList actualWriteIds, long compactorTxnId) throws IOException, HiveException { - org.apache.hadoop.hive.ql.metadata.Table tempTable = Hive.get().getTable(tmpTableName); - String from = tempTable.getSd().getLocation(); - Path fromPath = new Path(from), toPath = new Path(dest); - FileSystem fs = fromPath.getFileSystem(conf); - // Assume the high watermark can be used as maximum transaction ID. - //todo: is that true? can it be aborted? does it matter for compaction? probably OK since - //getAcidState() doesn't check if X is valid in base_X_vY for compacted base dirs. - long maxTxn = actualWriteIds.getHighWatermark(); - AcidOutputFormat.Options options = - new AcidOutputFormat.Options(conf).writingBase(true).isCompressed(false).maximumWriteId(maxTxn).bucket(0) - .statementId(-1).visibilityTxnId(compactorTxnId); - Path newBaseDir = AcidUtils.createFilename(toPath, options).getParent(); - if (!fs.exists(fromPath)) { - LOG.info(from + " not found. Assuming 0 splits. Creating " + newBaseDir); - fs.mkdirs(newBaseDir); - return; - } - LOG.info("Moving contents of " + from + " to " + dest); - fs.rename(fromPath, newBaseDir); - fs.delete(fromPath, true); + Util.cleanupEmptyDir(conf, tmpTableName); } private List getCreateQueries(String tmpTableName, Table table, - StorageDescriptor storageDescriptor, String baseLocation) { - return Lists.newArrayList(MmQueryCompactorUtils - .getCreateQuery(tmpTableName, table, storageDescriptor, baseLocation, false, false)); + StorageDescriptor storageDescriptor, ValidWriteIdList writeIds, HiveConf conf) { + + long minOpenWriteId = writeIds.getMinOpenWriteId() == null ? 1 : writeIds.getMinOpenWriteId(); + long highWaterMark = writeIds.getHighWatermark(); + long compactorTxnId = CompactorMR.CompactorMap.getCompactorTxnId(conf); + AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf).writingBase(true) + .isCompressed(false).minimumWriteId(minOpenWriteId) + .maximumWriteId(highWaterMark).bucket(0).statementId(-1).visibilityTxnId(compactorTxnId); + Path tmpTablePath = + AcidUtils.baseOrDeltaSubdirPath(new Path(storageDescriptor.getLocation()), options); + + return Lists.newArrayList(MmQueryCompactorUtils.getCreateQuery(tmpTableName, table, + storageDescriptor, tmpTablePath.toString(), false)); } private List getCompactionQueries(Table t, Partition p, String tmpName) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java index feb667cba9..5dec9146d7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.ql.txn.compactor; import com.google.common.collect.Lists; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; @@ -29,7 +28,6 @@ import org.apache.hadoop.hive.metastore.txn.CompactionInfo; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; -import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hive.common.util.Ref; import org.slf4j.Logger; @@ -59,18 +57,14 @@ .getAcidState(null, new Path(storageDescriptor.getLocation()), hiveConf, writeIds, Ref.from(false), false, table.getParameters(), false); MmQueryCompactorUtils.removeFilesForMmTable(hiveConf, dir); - String tmpLocation = Util.generateTmpPath(storageDescriptor); - Path sourceTabLocation = new Path(tmpLocation); - Path resultTabLocation = new Path(tmpLocation, "_result"); HiveConf driverConf = setUpDriverSession(hiveConf); String tmpPrefix = table.getDbName() + ".tmp_minor_compactor_" + table.getTableName() + "_"; String tmpTableBase = tmpPrefix + System.currentTimeMillis(); - List createTableQueries = - getCreateQueries(tmpTableBase, table, partition == null ? table.getSd() : partition.getSd(), - sourceTabLocation.toString(), resultTabLocation.toString(), dir, writeIds); + List createTableQueries = getCreateQueries(tmpTableBase, table, + partition == null ? table.getSd() : partition.getSd(), dir, writeIds, hiveConf); List compactionQueries = getCompactionQueries(tmpTableBase, table.getSd()); List dropQueries = getDropQueries(tmpTableBase); runCompactionQueries(driverConf, tmpTableBase, storageDescriptor, writeIds, compactionInfo, @@ -82,26 +76,7 @@ */ @Override protected void commitCompaction(String dest, String tmpTableName, HiveConf conf, ValidWriteIdList actualWriteIds, long compactorTxnId) throws IOException, HiveException { - org.apache.hadoop.hive.ql.metadata.Table resultTable = - Hive.get().getTable(tmpTableName + "_result"); - String from = resultTable.getSd().getLocation(); - Path fromPath = new Path(from); - Path toPath = new Path(dest); - FileSystem fs = fromPath.getFileSystem(conf); - long maxTxn = actualWriteIds.getHighWatermark(); - AcidOutputFormat.Options options = - new AcidOutputFormat.Options(conf).writingBase(false).isCompressed(false) - .minimumWriteId(1).maximumWriteId(maxTxn).bucket(0).statementId(-1) - .visibilityTxnId(compactorTxnId); - Path newDeltaDir = AcidUtils.createFilename(toPath, options).getParent(); - if (!fs.exists(fromPath)) { - LOG.info(from + " not found. Assuming 0 splits. Creating " + newDeltaDir); - fs.mkdirs(newDeltaDir); - return; - } - LOG.info("Moving contents of " + from + " to " + dest); - fs.rename(fromPath, newDeltaDir); - fs.delete(fromPath, true); + Util.cleanupEmptyDir(conf, tmpTableName + "_result"); } /** @@ -117,21 +92,27 @@ * @param tmpTableBase name of the first temp table (second will be $tmpTableBase_result) * @param t Table to compact * @param sd storage descriptor of table or partition to compact - * @param sourceTabLocation location the "source table" (temp table 1) should go - * @param resultTabLocation location the "result table (temp table 2) should go * @param dir the parent directory of delta directories - * @param validWriteIdList valid write ids for the table/partition to compact + * @param writeIds ValidWriteIdList for the table/partition we are compacting * @return List of 3 query strings: 2 create table, 1 alter table */ private List getCreateQueries(String tmpTableBase, Table t, StorageDescriptor sd, - String sourceTabLocation, String resultTabLocation, AcidUtils.Directory dir, - ValidWriteIdList validWriteIdList) { + AcidUtils.Directory dir, ValidWriteIdList writeIds, HiveConf conf) { List queries = new ArrayList<>(); - queries.add( - MmQueryCompactorUtils.getCreateQuery(tmpTableBase, t, sd, sourceTabLocation, true, true)); - buildAlterTableQuery(tmpTableBase, dir, validWriteIdList).ifPresent(queries::add); + queries.add(MmQueryCompactorUtils.getCreateQuery(tmpTableBase, t, sd, null, true)); + buildAlterTableQuery(tmpTableBase, dir, writeIds).ifPresent(queries::add); + + long minOpenWriteId = writeIds.getMinOpenWriteId() == null ? 1 : writeIds.getMinOpenWriteId(); + long highWatermark = writeIds.getHighWatermark(); + long compactorTxnId = CompactorMR.CompactorMap.getCompactorTxnId(conf); + AcidOutputFormat.Options options = + new AcidOutputFormat.Options(conf).writingBase(false).isCompressed(false) + .minimumWriteId(minOpenWriteId).maximumWriteId(highWatermark).statementId(-1) + .visibilityTxnId(compactorTxnId); + Path location = new Path(sd.getLocation()); + String resultTabLocation = AcidUtils.baseOrDeltaSubdirPath(location, options).toString(); queries.add(MmQueryCompactorUtils - .getCreateQuery(tmpTableBase + "_result", t, sd, resultTabLocation, false, false)); + .getCreateQuery(tmpTableBase + "_result", t, sd, resultTabLocation, false)); return queries; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmQueryCompactorUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmQueryCompactorUtils.java index 891696dba7..064e4a76ed 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmQueryCompactorUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmQueryCompactorUtils.java @@ -59,16 +59,12 @@ private MmQueryCompactorUtils() {} * @param sd StorageDescriptor of the table or partition we are modeling the new table on * @param location of the new table * @param isPartitioned should the new table be partitioned - * @param isExternal should the new table be external * @return query string creating the new table */ static String getCreateQuery(String fullName, Table sourceTab, StorageDescriptor sd, - String location, boolean isPartitioned, boolean isExternal) { - StringBuilder query = new StringBuilder("create temporary "); - if (isExternal) { - query.append("external "); - } - query.append("table ").append(fullName).append("("); + String location, boolean isPartitioned) { + StringBuilder query = new StringBuilder("create temporary external table ") + .append(fullName).append("("); List cols = sourceTab.getSd().getCols(); boolean isFirst = true; for (FieldSchema col : cols) { @@ -132,8 +128,13 @@ static String getCreateQuery(String fullName, Table sourceTab, StorageDescriptor ShowCreateTableOperation.appendSerdeParams(query, serdeParams); } query.append("STORED AS INPUTFORMAT '").append(HiveStringUtils.escapeHiveCommand(sd.getInputFormat())) - .append("' OUTPUTFORMAT '").append(HiveStringUtils.escapeHiveCommand(sd.getOutputFormat())) - .append("' LOCATION '").append(HiveStringUtils.escapeHiveCommand(location)).append("' TBLPROPERTIES ("); + .append("' OUTPUTFORMAT '") + .append(HiveStringUtils.escapeHiveCommand(sd.getOutputFormat())).append("'"); + + if (location != null && !location.isEmpty()) { + query.append(" LOCATION '").append(HiveStringUtils.escapeHiveCommand(location)).append("'"); + } + query.append(" TBLPROPERTIES ("); // Exclude all standard table properties. Set excludes = getHiveMetastoreConstants(); excludes.addAll(StatsSetupConst.TABLE_PARAMS_STATS_KEYS);