diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java index b7245e2c35..2526988041 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java @@ -316,27 +316,25 @@ public void testCompactionWithSchemaEvolutionNoBucketsMultipleReducers() throws + " partition (ds) values(3,2,1000,'yesterday'),(3,3,1001,'today'),(3,4,1002,'yesterday'),(4,2,1003,'today')," + "(4,3,1004,'yesterday'),(4,4,1005,'today')", driver); executeStatementOnDriver("delete from " + tblName + " where b = 2", driver); - // Run major compaction and cleaner + + // Prepare expected results -- ACID sort order not guaranteed here, so ordering is needed. + List expectedRsPtnToday = executeStatementOnDriverAndReturnResults("select ROW__ID, * from " + tblName + + " where ds='today' ORDER BY ROW__ID", driver); + List expectedRsPtnYesterday = executeStatementOnDriverAndReturnResults("select ROW__ID, * from " + tblName + + " where ds='yesterday' ORDER BY ROW__ID", driver); + + // Run major compaction and cleaner runCompaction(dbName, tblName, CompactionType.MAJOR, "ds=yesterday", "ds=today"); runCleaner(hiveConf); - List expectedRsPtnToday = new ArrayList<>(); - expectedRsPtnToday.add("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t1\t3\tNULL\ttoday"); - expectedRsPtnToday.add("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t2\t3\tNULL\ttoday"); - expectedRsPtnToday.add("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":3}\t2\t4\tNULL\ttoday"); - expectedRsPtnToday.add("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t3\t3\t1001\ttoday"); - expectedRsPtnToday.add("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":2}\t4\t4\t1005\ttoday"); - List expectedRsPtnYesterday = new ArrayList<>(); - expectedRsPtnYesterday.add("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t1\t4\tNULL\tyesterday"); - expectedRsPtnYesterday.add("{\"writeid\":3,\"bucketid\":536936448,\"rowid\":1}\t3\t4\t1002\tyesterday"); - expectedRsPtnYesterday.add("{\"writeid\":3,\"bucketid\":536936448,\"rowid\":2}\t4\t3\t1004\tyesterday"); - // Partition 'today' + + // Verify results -- after compaction the ACID sort order can be assumed. List rsCompactPtnToday = executeStatementOnDriverAndReturnResults("select ROW__ID, * from " + tblName + " where ds='today'", driver); Assert.assertEquals("compacted read", expectedRsPtnToday, rsCompactPtnToday); - // Partition 'yesterday' List rsCompactPtnYesterday = executeStatementOnDriverAndReturnResults("select ROW__ID, * from " + tblName + " where ds='yesterday'", driver); Assert.assertEquals("compacted read", expectedRsPtnYesterday, rsCompactPtnYesterday); + // Clean up executeStatementOnDriver("drop table " + tblName, driver); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/DriverUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/DriverUtils.java index 1eacf69657..704814853c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/DriverUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/DriverUtils.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hive.ql; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.common.ValidWriteIdList; @@ -54,7 +56,16 @@ public static void runOnDriver(HiveConf conf, String user, SessionState sessionS * For Query Based compaction to run the query to generate the compacted data. */ public static void runOnDriver(HiveConf conf, String user, - SessionState sessionState, String query, ValidWriteIdList writeIds, long compactorTxnId) + SessionState sessionState, String query, ValidWriteIdList writeIds, long compactorTxnId) + throws HiveException { + runOnDriver(conf, user, sessionState, query, writeIds, compactorTxnId, false); + } + + /** + * For Query Based compaction to run the query to retrieve bucket ids. + */ + public static List runOnDriver(HiveConf conf, String user, SessionState sessionState, + String query, ValidWriteIdList writeIds, long compactorTxnId, boolean getResults) throws HiveException { if(writeIds != null && compactorTxnId < 0) { throw new IllegalArgumentException(JavaUtils.txnIdToString(compactorTxnId) + @@ -62,6 +73,7 @@ public static void runOnDriver(HiveConf conf, String user, } SessionState.setCurrentSessionState(sessionState); boolean isOk = false; + List results = new ArrayList(); try { QueryState qs = new QueryState.Builder().withHiveConf(conf).withGenerateNewQueryId(true).nonIsolated().build(); Driver driver = new Driver(qs, null, null); @@ -69,9 +81,15 @@ public static void runOnDriver(HiveConf conf, String user, try { try { driver.run(query); + if (getResults) { + driver.getResults(results); + } } catch (CommandProcessorException e) { LOG.error("Failed to run " + query, e); throw new HiveException("Failed to run " + query, e); + } catch (IOException e) { + LOG.error("Failed to fetch query results for: " + query, e); + throw new HiveException("Failed to fetch query results for: " + query, e); } } finally { driver.close(); @@ -87,6 +105,8 @@ public static void runOnDriver(HiveConf conf, String user, SessionState.detachSession(); } } + + return results; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java index 38689ef86c..1b6416b6d4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.ql.DriverUtils; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.BucketCodec; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.session.SessionState; @@ -40,6 +41,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.ArrayList; import java.util.List; /** @@ -48,6 +50,7 @@ class MajorQueryCompactor extends QueryCompactor { private static final Logger LOG = LoggerFactory.getLogger(MajorQueryCompactor.class.getName()); + private static final String QUERY_FETCH_BUCKET_ID_LIST = "select distinct(bucket) from %s order by bucket asc"; @Override void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageDescriptor storageDescriptor, @@ -91,6 +94,30 @@ void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageD * db/db_tmp_compactor_tbl_1234/00001_0 */ DriverUtils.runOnDriver(conf, user, sessionState, query, writeIds, compactorTxnId); + + /* + * Prepare bucketIdList, the mapping between the bucket ID calculated from the filename of the temporary table + * buckets and the actual bucket ID to make sure the bucket IDs and the final filenames will match. + * Assumptions (for the output of the previous step): + * 1. Rows with same bucket ID are in the same file. + * 2. Rows in the same file have the same bucket ID. + */ + query = String.format(QUERY_FETCH_BUCKET_ID_LIST, tmpTableName); + // Need to change back to "query" for the query to work. + conf.set(HiveConf.ConfVars.SPLIT_GROUPING_MODE.varname, "query"); + List bucketIdStringList = DriverUtils.runOnDriver(conf, user, sessionState, query, writeIds, compactorTxnId, true); + List bucketIdList = new ArrayList<>(); + for (String bucketIdString: bucketIdStringList) { + try { + int bucketProperty = Integer.parseInt(bucketIdString.trim()); + int bucketNum = BucketCodec.determineVersion(bucketProperty).decodeWriterId(bucketProperty); + bucketIdList.add(bucketNum); + } catch (NumberFormatException e) { + LOG.warn("Failed to prepare bucket ID list.", e); + bucketIdList.clear(); + } + } + /* * This achieves a final layout like (wid is the highest valid write id for this major compaction): * db/tbl/ptn/base_wid/bucket_00000 @@ -99,7 +126,7 @@ void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageD org.apache.hadoop.hive.ql.metadata.Table tempTable = Hive.get().getTable(tmpTableName); String tmpLocation = tempTable.getSd().getLocation(); commitCrudMajorCompaction(tmpLocation, tmpTableName, storageDescriptor.getLocation(), conf, writeIds, - compactorTxnId); + compactorTxnId, bucketIdList); } catch (HiveException e) { LOG.error("Error doing query based major compaction", e); throw new IOException(e); @@ -176,7 +203,8 @@ private String buildCrudMajorCompactionQuery(Table t, Partition p, String tmpNam * we will end up with one file per bucket. */ private void commitCrudMajorCompaction(String from, String tmpTableName, String to, HiveConf conf, - ValidWriteIdList actualWriteIds, long compactorTxnId) throws IOException, HiveException { + ValidWriteIdList actualWriteIds, long compactorTxnId, List bucketIdList) throws IOException, HiveException { + boolean checkBucketIdFromList = !bucketIdList.isEmpty(); Path fromPath = new Path(from); Path toPath = new Path(to); Path tmpTablePath = new Path(fromPath, tmpTableName); @@ -195,17 +223,24 @@ private void commitCrudMajorCompaction(String from, String tmpTableName, String } LOG.info("Moving contents of {} to {}", tmpTablePath, to); /* - * Currently mapping file with name 0000_0 to bucket_00000, 0000_1 to bucket_00001 and so on - * TODO/ToThink: - * Q. Can file with name 0000_0 under temp table be deterministically renamed to bucket_00000 in the destination? + * Mapping temporary file with name 0000_0 to bucket_00000, 0000_1 to bucket_00001 and so on, is not enough. + * Since temporary tables are not transactional, the original bucket ids are not reflected in the filenames of + * the buckets of temporary tables. I.e., could be that a multi insert that inserts values to two different + * partitions, executed with multiple reducers produce a single bucket_00001 file. Since it is a single bucket, + * the output file after compaction will still be named 0000_0, so we cannot assume that the final filename for + * OOOO_O should be bucket_00000, and bucket_00001 for 0000_1, respectively. Instead a mapping needs to be done. + * The mapping is stored by the bucketIdList. For 0000_0, the 0th element stores the calculated bucket ID, for + * 0000_1, the 1st element, and so on... */ - // List buckCols = t.getSd().getBucketCols(); FileStatus[] children = fs.listStatus(fromPath); for (FileStatus filestatus : children) { String originalFileName = filestatus.getPath().getName(); // This if() may not be required I think... if (AcidUtils.ORIGINAL_PATTERN.matcher(originalFileName).matches()) { int bucketId = AcidUtils.parseBucketId(filestatus.getPath()); + if (checkBucketIdFromList) { + bucketId = bucketIdList.get(bucketId)==null?bucketId:bucketIdList.get(bucketId); + } options = new AcidOutputFormat.Options(conf).writingBase(true).isCompressed(false).maximumWriteId(maxTxn) .bucket(bucketId).statementId(-1).visibilityTxnId(compactorTxnId); Path finalBucketFile = AcidUtils.createFilename(toPath, options);