diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java index 445e39c260edc68f511550271a7ac471fae908fe..7ae33fadf7c8da83924e317287d160faf5364e3d 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java @@ -1585,6 +1585,117 @@ public void testDisableCompactionDuringReplLoad() throws Exception { runCleaner(conf); } + /** + * Tests compaction of tables that were populated by LOAD DATA INPATH statements. + * + * In this scenario original ORC files are a structured in the following way: + * comp3 + * |--delta_0000001_0000001_0000 + * |--000000_0 + * |--delta_0000002_0000002_0000 + * |--000000_0 + * |--000001_0 + * + * ..where comp3 table is not bucketed. + * + * @throws Exception + */ + @Test + public void testCompactionOnDataLoadedInPath() throws Exception { + // Setup of LOAD INPATH scenario. + executeStatementOnDriver("drop table if exists comp0", driver); + executeStatementOnDriver("drop table if exists comp1", driver); + executeStatementOnDriver("drop table if exists comp3", driver); + + executeStatementOnDriver("create external table comp0 (a string)", driver); + executeStatementOnDriver("insert into comp0 values ('1111111111111')", driver); + executeStatementOnDriver("insert into comp0 values ('2222222222222')", driver); + executeStatementOnDriver("insert into comp0 values ('3333333333333')", driver); + executeStatementOnDriver("create external table comp1 stored as orc as select * from comp0", driver); + + executeStatementOnDriver("create table comp3 (a string) stored as orc " + + "TBLPROPERTIES ('transactional'='true')", driver); + + IMetaStoreClient hmsClient = new HiveMetaStoreClient(conf); + Table table = hmsClient.getTable("default", "comp1"); + FileSystem fs = FileSystem.get(conf); + Path path000 = fs.listStatus(new Path(table.getSd().getLocation()))[0].getPath(); + Path path001 = new Path(path000.toString().replace("000000", "000001")); + Path path002 = new Path(path000.toString().replace("000000", "000002")); + fs.copyFromLocalFile(path000, path001); + fs.copyFromLocalFile(path000, path002); + + executeStatementOnDriver("load data inpath '" + path002.toString() + "' into table comp3", driver); + executeStatementOnDriver("load data inpath '" + path002.getParent().toString() + "' into table comp3", driver); + + // Run compaction. + TxnStore txnHandler = TxnUtils.getTxnStore(conf); + CompactionRequest rqst = new CompactionRequest("default", "comp3", CompactionType.MAJOR); + txnHandler.compact(rqst); + runWorker(conf); + ShowCompactRequest scRqst = new ShowCompactRequest(); + List compacts = txnHandler.showCompact(scRqst).getCompacts(); + assertEquals(1, compacts.size()); + assertEquals(TxnStore.CLEANING_RESPONSE, compacts.get(0).getState()); + + runCleaner(conf); + compacts = txnHandler.showCompact(scRqst).getCompacts(); + assertEquals(1, compacts.size()); + assertEquals(TxnStore.SUCCEEDED_RESPONSE, compacts.get(0).getState()); + + // Check compacted content and file structure. + table = hmsClient.getTable("default", "comp3"); + List rs = execSelectAndDumpData("select * from comp3", driver, "select"); + assertEquals(9, rs.size()); + assertEquals(3, rs.stream().filter(p -> "1111111111111".equals(p)).count()); + assertEquals(3, rs.stream().filter(p -> "2222222222222".equals(p)).count()); + assertEquals(3, rs.stream().filter(p -> "3333333333333".equals(p)).count()); + + FileStatus[] files = fs.listStatus(new Path(table.getSd().getLocation())); + // base dir + assertEquals(1, files.length); + assertEquals("base_0000002_v0000012", files[0].getPath().getName()); + files = fs.listStatus(files[0].getPath(), AcidUtils.bucketFileFilter); + // files + assertEquals(2, files.length); + Arrays.stream(files).filter(p->"bucket_00000".equals(p.getPath().getName())).count(); + Arrays.stream(files).filter(p->"bucket_00001".equals(p.getPath().getName())).count(); + + // Another insert into the newly compacted table. + executeStatementOnDriver("insert into comp3 values ('4444444444444')", driver); + + // Compact with extra row too. + txnHandler.compact(rqst); + runWorker(conf); + compacts = txnHandler.showCompact(scRqst).getCompacts(); + assertEquals(2, compacts.size()); + assertEquals(TxnStore.CLEANING_RESPONSE, compacts.get(0).getState()); + + runCleaner(conf); + compacts = txnHandler.showCompact(scRqst).getCompacts(); + assertEquals(2, compacts.size()); + assertEquals(TxnStore.SUCCEEDED_RESPONSE, compacts.get(0).getState()); + + // Check compacted content and file structure. + rs = execSelectAndDumpData("select * from comp3", driver, "select"); + assertEquals(10, rs.size()); + assertEquals(3, rs.stream().filter(p -> "1111111111111".equals(p)).count()); + assertEquals(3, rs.stream().filter(p -> "2222222222222".equals(p)).count()); + assertEquals(3, rs.stream().filter(p -> "3333333333333".equals(p)).count()); + assertEquals(1, rs.stream().filter(p -> "4444444444444".equals(p)).count()); + + files = fs.listStatus(new Path(table.getSd().getLocation())); + // base dir + assertEquals(1, files.length); + assertEquals("base_0000003_v0000015", files[0].getPath().getName()); + files = fs.listStatus(files[0].getPath(), AcidUtils.bucketFileFilter); + // files + assertEquals(2, files.length); + Arrays.stream(files).filter(p->"bucket_00000".equals(p.getPath().getName())).count(); + Arrays.stream(files).filter(p->"bucket_00001".equals(p.getPath().getName())).count(); + + } + private List getCompactionList() throws Exception { conf.setIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD, 0); runInitiator(conf); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 2b2cc1a2ba8377aa3681b1a3454a0d64369eef64..e26794355ca1e73be5c103f8405c471da870bbe3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -618,18 +618,36 @@ public String toString() { } } + boolean isTableBucketed = entries.getInt(NUM_BUCKETS, -1) != -1; List splits = new ArrayList(splitToBucketMap.size()); for (Map.Entry e : splitToBucketMap.entrySet()) { BucketTracker bt = e.getValue(); + // For non-bucketed tables we might not have a 00000x_0 in all the delta dirs e.g. after + // multiple ingestions of various sizes. + Path[] deltasForSplit = isTableBucketed ? deltaDirs : getDeltaDirsFromBucketTracker(bt); splits.add(new CompactorInputSplit(entries, e.getKey(), bt.buckets, - bt.sawBase ? baseDir : null, deltaDirs)); + bt.sawBase ? baseDir : null, deltasForSplit)); } LOG.debug("Returning " + splits.size() + " splits"); return splits.toArray(new InputSplit[splits.size()]); } + private static Path[] getDeltaDirsFromBucketTracker(BucketTracker bucketTracker) { + List resultList = new ArrayList<>(bucketTracker.buckets.size()); + + for (int i = 0; i < bucketTracker.buckets.size(); ++i) { + Path p = bucketTracker.buckets.get(i).getParent(); + if (p.getName().startsWith(AcidUtils.DELTA_PREFIX) || + p.getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX)) { + resultList.add(p); + } + } + + return resultList.toArray(new Path[0]); + } + @Override public RecordReader getRecordReader( InputSplit inputSplit, JobConf entries, Reporter reporter) throws IOException {