diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java index 2d0393662c..6152e12901 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java @@ -1744,6 +1744,41 @@ public void testCompactionOnDataLoadedInPath() throws Exception { } + @Test + public void testCompactionDataLoadedWithInsertOverwrite() throws Exception { + String externalTableName = "test_comp_txt"; + String tableName = "test_comp"; + executeStatementOnDriver("DROP TABLE IF EXISTS " + externalTableName, driver); + executeStatementOnDriver("DROP TABLE IF EXISTS " + tableName, driver); + executeStatementOnDriver("CREATE EXTERNAL TABLE " + externalTableName + "(a int, b int, c int) STORED AS TEXTFILE", driver); + executeStatementOnDriver("CREATE TABLE " + tableName + "(a int, b int, c int) STORED AS ORC TBLPROPERTIES('transactional'='true')", driver); + + executeStatementOnDriver("INSERT INTO " + externalTableName + " values (1, 1, 1), (2, 2, 2), (3, 3, 3), (4, 4, 4)", driver); + executeStatementOnDriver("INSERT OVERWRITE TABLE " + tableName + " SELECT * FROM test_comp_txt", driver); + + executeStatementOnDriver("UPDATE " + tableName + " SET b=55, c=66 WHERE a=2", driver); + executeStatementOnDriver("DELETE FROM " + tableName + " WHERE a=4", driver); + executeStatementOnDriver("UPDATE " + tableName + " SET b=77 WHERE a=1", driver); + + executeStatementOnDriver("SELECT * FROM " + tableName + " ORDER BY a", driver); + ArrayList valuesReadFromHiveDriver = new ArrayList(); + driver.getResults(valuesReadFromHiveDriver); + Assert.assertEquals(3, valuesReadFromHiveDriver.size()); + Assert.assertEquals("1\t77\t1", valuesReadFromHiveDriver.get(0)); + Assert.assertEquals("2\t55\t66", valuesReadFromHiveDriver.get(1)); + Assert.assertEquals("3\t3\t3", valuesReadFromHiveDriver.get(2)); + + runMajorCompaction("default", tableName); + + // Validate after compaction. + executeStatementOnDriver("SELECT * FROM " + tableName + " ORDER BY a", driver); + valuesReadFromHiveDriver = new ArrayList(); + driver.getResults(valuesReadFromHiveDriver); + Assert.assertEquals(3, valuesReadFromHiveDriver.size()); + Assert.assertEquals("1\t77\t1", valuesReadFromHiveDriver.get(0)); + Assert.assertEquals("2\t55\t66", valuesReadFromHiveDriver.get(1)); + } + private List getCompactionList() throws Exception { conf.setIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD, 0); runInitiator(conf); @@ -1805,6 +1840,13 @@ private void verifyCompactions(List compacts, Sorted } } + private void verifyCompactions(List compacts, String tblName) { + for (ShowCompactResponseElement compact : compacts) { + Assert.assertEquals("default", compact.getDbname()); + Assert.assertEquals(tblName, compact.getTablename()); + Assert.assertEquals("initiated", compact.getState()); + } + } private void processStreamingAPI(String dbName, String tblName, boolean newStreamingAPI) throws StreamingException, ClassNotFoundException, diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index f543418179..16c915959c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -1079,7 +1079,11 @@ public Options clone() { assert mergerOptions.getBaseDir() != null : "no baseDir?: " + mergerOptions.getRootPath(); //we are compacting and it's acid schema so create a reader for the 1st bucket file that is not empty FileSystem fs = mergerOptions.getBaseDir().getFileSystem(conf); - Path bucketPath = AcidUtils.createBucketFile(mergerOptions.getBaseDir(), bucket); + String attemptId = null; + if (deltasToAttemptId != null) { + attemptId = deltasToAttemptId.get(mergerOptions.getBaseDir().toString()); + } + Path bucketPath = AcidUtils.createBucketFile(mergerOptions.getBaseDir(), bucket, attemptId); if(fs.exists(bucketPath) && fs.getFileStatus(bucketPath).getLen() > 0) { //doing major compaction - it's possible where full compliment of bucket files is not //required (on Tez) that base_x/ doesn't have a file for 'bucket' diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index f26d4bb2c3..9410a96351 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -467,6 +467,13 @@ public void write(DataOutput dataOutput) throws IOException { } else { dataOutput.writeInt(base.toString().length()); dataOutput.writeBytes(base.toString()); + String attemptId = deltasToAttemptId.get(base.toString()); + if (attemptId == null) { + dataOutput.writeInt(0); + } else { + dataOutput.writeInt(attemptId.length()); + dataOutput.writeBytes(attemptId.toString()); + } } dataOutput.writeInt(deltas.length); for (int i = 0; i < deltas.length; i++) { @@ -504,10 +511,17 @@ public void readFields(DataInput dataInput) throws IOException { LOG.debug("Read bucket number of " + bucketNum); len = dataInput.readInt(); LOG.debug("Read base path length of " + len); + String baseAttemptId = null; if (len > 0) { buf = new byte[len]; dataInput.readFully(buf); base = new Path(new String(buf)); + len = dataInput.readInt(); + if (len > 0) { + buf = new byte[len]; + dataInput.readFully(buf); + baseAttemptId = new String(buf); + } } numElements = dataInput.readInt(); deltas = new Path[numElements]; @@ -526,6 +540,9 @@ public void readFields(DataInput dataInput) throws IOException { } deltasToAttemptId.put(deltas[i].toString(), attemptId); } + if (baseAttemptId != null) { + deltasToAttemptId.put(base.toString(), baseAttemptId); + } } public void set(CompactorInputSplit other) {