diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java index b828f4cffd..615fa4b3ae 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java @@ -429,7 +429,7 @@ public void testNonStandardConversion02() throws Exception { * Subdirs are named HIVE_UNION_SUBDIR_1/, HIVE_UNION_SUBDIR_2/, etc * For Acid tables the writer for each dir must have a different statementId ensured by * {@link org.apache.hadoop.hive.ql.optimizer.QueryPlanPostProcessor}. - * {@link org.apache.hadoop.hive.ql.metadata.Hive#moveAcidFiles(FileSystem, FileStatus[], Path, List)} drops the union subdirs + * {@link org.apache.hadoop.hive.ql.metadata.Hive#moveAcidFiles(FileSystem, FileStatus[], Path, List, boolean)} drops the union subdirs * since each delta file has a unique name. */ @Ignore("HIVE-19509: Disable tests that are failing continuously") diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index f80a945be5..0ba1991f6d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -307,7 +307,7 @@ public int execute(DriverContext driverContext) { FileStatus[] srcs = srcFs.globStatus(sourcePath); if(srcs != null) { List newFiles = new ArrayList<>(); - Hive.moveAcidFiles(srcFs, srcs, targetPath, newFiles); + Hive.moveAcidFiles(srcFs, srcs, targetPath, newFiles, false); } else { LOG.debug("No files found to move from " + sourcePath + " to " + targetPath); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java index 1359dc3be3..23ea5adb1f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java @@ -199,6 +199,12 @@ static StructObjectInspector createEventSchema(ObjectInspector rowInspector) { */ OrcRecordUpdater(Path partitionRoot, AcidOutputFormat.Options options) throws IOException { + if(options.isWritingBase()) { + if(partitionRoot.toString().contains("HIVE_UNION_SUBDIR_")) { + options.writingBase(false);//todo: clone this + } + } + this.options = options; // Initialize acidOperationalProperties based on table properties, and // if they are not available, see if we can find it in the job configuration. diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 2ec131e274..d11655b272 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -2367,7 +2367,7 @@ public void loadTable(Path loadPath, String tableName, LoadFileType loadFileType try { FileSystem fs = tbl.getDataLocation().getFileSystem(conf); copyFiles(conf, loadPath, destPath, fs, isSrcLocal, isAcidIUDoperation, - loadFileType == LoadFileType.OVERWRITE_EXISTING, newFiles, + loadFileType == LoadFileType.OVERWRITE_EXISTING || isInsertOverwrite, newFiles, tbl.getNumBuckets() > 0 ? true : false, isFullAcidTable); } catch (IOException e) { throw new HiveException("addFiles: filesystem error in check phase", e); @@ -3868,7 +3868,7 @@ static protected void copyFiles(HiveConf conf, Path srcf, Path destf, FileSystem // If we're moving files around for an ACID write then the rules and paths are all different. // You can blame this on Owen. if (isAcidIUD) { - moveAcidFiles(srcFs, srcs, destf, newFiles); + moveAcidFiles(srcFs, srcs, destf, newFiles, isOverwrite); } else { // For ACID non-bucketed case, the filenames have to be in the format consistent with INSERT/UPDATE/DELETE Ops, // i.e, like 000000_0, 000001_0_copy_1, 000002_0.gz etc. @@ -3879,7 +3879,7 @@ static protected void copyFiles(HiveConf conf, Path srcf, Path destf, FileSystem } public static void moveAcidFiles(FileSystem fs, FileStatus[] stats, Path dst, - List newFiles) throws HiveException { + List newFiles, boolean isOverwrite) throws HiveException { // The layout for ACID files is table|partname/base|delta|delete_delta/bucket // We will always only be writing delta files ( except IOW which writes base_X/ ). // In the buckets created by FileSinkOperator @@ -3929,6 +3929,34 @@ produced by a (optimized) Union All query fs.listStatus(unionSubdir.getPath(), AcidUtils.originalBucketFilter)); } origBucketStats = buckets.toArray(new FileStatus[buckets.size()]); + if(isOverwrite && unionSubdirs.length > 0) { + LOG.info("Detected multi-folder IOW with Union All: deleteing files under " + dst); + for(FileStatus toDelete : fs.listStatus(dst, AcidUtils.hiddenFileFilter)) { + int attempts = 3; + while(--attempts >= 0) { + try { + if(fs.delete(toDelete.getPath(), true)) { + break; + } + } + catch(IOException ex) { + LOG.info("IOW failed to delete: " + toDelete + "." + + (attempts > 0 ? " Will retry.": "")); + } + try { + Thread.sleep(200); + } + catch(InterruptedException ex) { + //lame + } + } + if(attempts < 0) { + String msg = "IOW failed to delete: " + toDelete.getPath() + ". aborting"; + LOG.error(msg); + throw new RuntimeException(msg); + } + } + } } } catch (IOException e) { String msg = "Unable to look for bucket files in src path " + srcPath.toUri().toString(); diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java index f071531cec..ef5f211dd8 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java @@ -273,15 +273,65 @@ public void testInsertToAcidWithUnionRemove() throws Exception { {"{\"writeid\":1,\"bucketid\":536870915,\"rowid\":0}\t9\t10", "/delta_0000001_0000001_0003/bucket_00000"}, {"{\"writeid\":1,\"bucketid\":536936450,\"rowid\":0}\t7\t8", "/delta_0000001_0000001_0002/bucket_00001"}, }; - checkExpected(rs, expected, "Unexpected row count after ctas"); + checkExpected(rs, expected, "Unexpected row count after insert"); } + /** - * The idea here is to create a non acid table that was written by multiple writers, i.e. - * unbucketed table that has 000000_0 & 000001_0, for example. - * Also, checks that we can handle a case when data files can be at multiple levels (subdirs) - * of the table. + * todo: partitioned tables + * @throws Exception */ @Test + public void testInsertOverwriteToAcidWithUnionRemove() throws Exception { + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_OPTIMIZE_UNION_REMOVE, true); + hiveConf.setVar(HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none"); + d.close(); + d = new Driver(hiveConf); + int[][] values = {{1, 2}, {3, 4}, {5, 6}, {7, 8}, {9, 10}}; + runStatementOnDriver("insert into " + TxnCommandsBaseForTests.Table.ACIDTBL + makeValuesClause(values));//HIVE-17138: this creates 1 delta_0000013_0000013_0000/bucket_00001 + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("create table T (a int, b int) stored as ORC TBLPROPERTIES ('transactional'='true')"); + + runStatementOnDriver("insert into T(a,b)" + makeValuesClause(values)); + List rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from T order by a"); + + String expected[][] = { + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "/delta_0000001_0000001_0000/bucket_00000"}, + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "/delta_0000001_0000001_0000/bucket_00000"}, + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t5\t6", "/delta_0000001_0000001_0000/bucket_00000"}, + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":3}\t7\t8", "/delta_0000001_0000001_0000/bucket_00000"}, + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":4}\t9\t10", "/delta_0000001_0000001_0000/bucket_00000"} + }; + checkExpected(rs, expected, "Unexpected row count after insert"); + + + runStatementOnDriver("insert overwrite table T select a, b from " + TxnCommandsBaseForTests.Table.ACIDTBL + " where a between 1 and 3 group by a, b union all select a, b from " + TxnCommandsBaseForTests.Table.ACIDTBL + " where a between 5 and 7 union all select a, b from " + TxnCommandsBaseForTests.Table.ACIDTBL + " where a >= 9"); + + rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from T order by a"); + +/* +{"writeid":2,"bucketid":536870913,"rowid":0} 1 2 t/delta_0000002_0000002_0001/bucket_00000 +{"writeid":2,"bucketid":536870913,"rowid":1} 3 4 t/delta_0000002_0000002_0001/bucket_00000 +{"writeid":2,"bucketid":536870914,"rowid":0} 5 6 t/delta_0000002_0000002_0002/bucket_00000 +{"writeid":2,"bucketid":536936450,"rowid":0} 7 8 t/delta_0000002_0000002_0002/bucket_00001 +{"writeid":2,"bucketid":536870915,"rowid":0} 9 10 t/delta_0000002_0000002_0003/bucket_00000 +*/ + String expectedw[][] = { + {"{\"writeid\":2,\"bucketid\":536870913,\"rowid\":0}\t1\t2", "/delta_0000002_0000002_0001/bucket_00000"}, + {"{\"writeid\":2,\"bucketid\":536870913,\"rowid\":1}\t3\t4", "/delta_0000002_0000002_0001/bucket_00000"}, + {"{\"writeid\":2,\"bucketid\":536870914,\"rowid\":0}\t5\t6", "/delta_0000002_0000002_0002/bucket_00000"}, + {"{\"writeid\":2,\"bucketid\":536936450,\"rowid\":0}\t7\t8", "/delta_0000002_0000002_0002/bucket_00001"}, + {"{\"writeid\":2,\"bucketid\":536870915,\"rowid\":0}\t9\t10", "/delta_0000002_0000002_0003/bucket_00000"} + }; + checkExpected(rs, expectedw, "Unexpected row count after iow"); + + } + /** + * The idea here is to create a non acid table that was written by multiple writers, i.e. + * unbucketed table that has 000000_0 & 000001_0, for example. + * Also, checks that we can handle a case when data files can be at multiple levels (subdirs) + * of the table. + */ + @Test public void testToAcidConversionMultiBucket() throws Exception { //need to disable these so that automatic merge doesn't merge the files hiveConf.setBoolVar(HiveConf.ConfVars.HIVEMERGEMAPFILES, false);