diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java index d0b5cf6047..dbff4012a8 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java @@ -88,7 +88,6 @@ public void setUp() throws Exception { hiveConf = new HiveConf(this.getClass()); hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); - hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, TEST_WAREHOUSE_DIR); hiveConf.setVar(HiveConf.ConfVars.HIVEMAPREDMODE, "nonstrict"); hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName()); @@ -307,7 +306,7 @@ public void testNonStandardConversion01() throws Exception { * of the partition dir. CTAS/Union/Tez doesn't support partition tables. The only way is to copy * data files in directly. */ - @Ignore("HIVE-17214") +// @Ignore("HIVE-17214") @Test public void testNonStandardConversion02() throws Exception { HiveConf confForTez = new HiveConf(hiveConf); // make a clone of existing hive conf @@ -407,6 +406,7 @@ public void testNonStandardConversion02() throws Exception { /** * CTAS + Tez + Union creates a non-standard layout in table dir * Each leg of the union places data into a subdir of the table/partition. Subdirs are named 1/, 2/, etc + * todo: fix comment * The way this currently works is that CTAS creates an Acid table but the insert statement writes * the data in non-acid layout. Then on read, it's treated like an non-acid to acid conversion. * Longer term CTAS should create acid layout from the get-go. @@ -414,8 +414,15 @@ public void testNonStandardConversion02() throws Exception { @Test public void testCtasTezUnion() throws Exception { HiveConf confForTez = new HiveConf(hiveConf); // make a clone of existing hive conf + confForTez.setBoolVar(HiveConf.ConfVars.HIVE_EXPLAIN_USER, false); setupTez(confForTez); //CTAS with ACID target table + List rs0 = runStatementOnDriver("explain create table " + Table.ACIDNOBUCKET + " stored as ORC TBLPROPERTIES('transactional'='true') as " + + "select a, b from " + Table.ACIDTBL + " where a <= 5 union all select a, b from " + Table.NONACIDORCTBL + " where a >= 5", confForTez); + LOG.warn("explain ctas:");//TezEdgeProperty.EdgeType + for (String s : rs0) { + LOG.warn(s); + } runStatementOnDriver("create table " + Table.ACIDNOBUCKET + " stored as ORC TBLPROPERTIES('transactional'='true') as " + "select a, b from " + Table.ACIDTBL + " where a <= 5 union all select a, b from " + Table.NONACIDORCTBL + " where a >= 5", confForTez); List rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.ACIDNOBUCKET + " order by ROW__ID"); @@ -425,14 +432,14 @@ public void testCtasTezUnion() throws Exception { } Assert.assertEquals(0, BucketCodec.determineVersion(536870912).decodeWriterId(536870912)); /* - * Expected result 0th entry i the RecordIdentifier + data. 1st entry file before compact*/ + * Expected result 0th entry is the RecordIdentifier + data. 1st entry file before compact*/ String expected[][] = { - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "/1/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "/1/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}\t5\t6", "/1/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10", "/2/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t7\t8", "/2/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":5}\t5\t6", "/2/000000_0"}, + {"{\"transactionid\":18,\"bucketid\":536870913,\"rowid\":0}\t1\t2", "/delta_0000018_0000018_0001/bucket_00000"}, + {"{\"transactionid\":18,\"bucketid\":536870913,\"rowid\":1}\t3\t4", "/delta_0000018_0000018_0001/bucket_00000"}, + {"{\"transactionid\":18,\"bucketid\":536870913,\"rowid\":2}\t5\t6", "/delta_0000018_0000018_0001/bucket_00000"}, + {"{\"transactionid\":18,\"bucketid\":536870914,\"rowid\":0}\t9\t10", "/delta_0000018_0000018_0002/bucket_00000"}, + {"{\"transactionid\":18,\"bucketid\":536870914,\"rowid\":1}\t7\t8", "/delta_0000018_0000018_0002/bucket_00000"}, + {"{\"transactionid\":18,\"bucketid\":536870914,\"rowid\":2}\t5\t6", "/delta_0000018_0000018_0002/bucket_00000"}, }; Assert.assertEquals("Unexpected row count after ctas", expected.length, rs.size()); //verify data and layout @@ -449,10 +456,10 @@ public void testCtasTezUnion() throws Exception { LOG.warn(s); } String[][] expected2 = { - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "/1/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "/1/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10", "/2/000000_0"}, - {"{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":0}\t70\t80", "delta_0000019_0000019_0000/bucket_00000"} + {"{\"transactionid\":18,\"bucketid\":536870913,\"rowid\":0}\t1\t2", "/delta_0000018_0000018_0001/bucket_00000"}, + {"{\"transactionid\":18,\"bucketid\":536870913,\"rowid\":1}\t3\t4", "/delta_0000018_0000018_0001/bucket_00000"}, + {"{\"transactionid\":18,\"bucketid\":536870914,\"rowid\":0}\t9\t10", "/delta_0000018_0000018_0002/bucket_00000"}, + {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":0}\t70\t80", "delta_0000020_0000020_0000/bucket_00000"} }; Assert.assertEquals("Unexpected row count after update", expected2.length, rs.size()); //verify data and layout @@ -464,7 +471,7 @@ public void testCtasTezUnion() throws Exception { FileSystem fs = FileSystem.get(hiveConf); FileStatus[] status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + (Table.ACIDNOBUCKET).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); - String[] expectedDelDelta = {"delete_delta_0000019_0000019_0000", "delete_delta_0000020_0000020_0000"}; + String[] expectedDelDelta = {"delete_delta_0000020_0000020_0000", "delete_delta_0000021_0000021_0000"}; for(FileStatus stat : status) { for(int i = 0; i < expectedDelDelta.length; i++) { if(expectedDelDelta[i] != null && stat.getPath().toString().endsWith(expectedDelDelta[i])) { @@ -493,7 +500,7 @@ public void testCtasTezUnion() throws Exception { //check we have right delete delta files after minor compaction status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + (Table.ACIDNOBUCKET).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); - String[] expectedDelDelta2 = { "delete_delta_0000019_0000019_0000", "delete_delta_0000020_0000020_0000", "delete_delta_0000019_0000020"}; + String[] expectedDelDelta2 = { "delete_delta_0000020_0000020_0000", "delete_delta_0000021_0000021_0000", "delete_delta_0000018_0000021"}; for(FileStatus stat : status) { for(int i = 0; i < expectedDelDelta2.length; i++) { if(expectedDelDelta2[i] != null && stat.getPath().toString().endsWith(expectedDelDelta2[i])) { @@ -517,8 +524,52 @@ public void testCtasTezUnion() throws Exception { for(int i = 0; i < expected2.length; i++) { Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected2[i][0])); //everything is now in base/ - Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith("base_0000020/bucket_00000")); + Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith("base_0000021/bucket_00000")); + } + } + @Test + public void testUnionRemove() throws Exception {//todo HIVE-17505 +// hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_OPTIMIZE_UNION_REMOVE, true); +// hiveConf.setVar(HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none"); +// d.close(); +// d = new Driver(hiveConf); + int[][] values = {{1,2},{3,4},{5,6},{7,8},{9,10}}; + runStatementOnDriver("insert into " + TxnCommandsBaseForTests.Table.ACIDTBL + TestTxnCommands2.makeValuesClause(values));//HIVE-17138: this creates 1 delta_0000013_0000013_0000/bucket_00001 + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("create table T (a int, b int) stored as ORC TBLPROPERTIES ('transactional'='true')");//todo: try with T bucketd + /* + with T non-acid, see HIVE-17505: we get data in subdirs (i.e. union is removed) but it's not moved to T + With T acid, we end up writing each leg to a different delta (by suffix) though this may be due to transient changes in HIVE-15899.03.patch + */ + runStatementOnDriver("insert into T(a,b) select a, b from " + TxnCommandsBaseForTests.Table.ACIDTBL + " where a between 1 and 3 group by a, b union all select a, b from " + TxnCommandsBaseForTests.Table.ACIDTBL + " where a between 5 and 7 union all select a, b from " + TxnCommandsBaseForTests.Table.ACIDTBL + " where a >= 9"); + List rs = runStatementOnDriver("select a, b, INPUT__FILE__NAME from T order by a, b, INPUT__FILE__NAME"); + LOG.warn("before converting to acid"); + for(String s : rs) { + LOG.warn(s); + } + //todo: test file names, etc + rs = runStatementOnDriver("select * from T order by a, b"); + Assert.assertEquals("", TestTxnCommands2.stringifyValues(values), rs); + } + //@Ignore("HIVE-17505") + @Test + public void testToAcidConversionMultiBucket() throws Exception {//todo HIVE-17505 + int[][] values = {{1,2},{3,4},{5,6},{7,8},{9,10}}; + runStatementOnDriver("insert into " + TxnCommandsBaseForTests.Table.ACIDTBL + TestTxnCommands2.makeValuesClause(values)); + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("create table T (a int, b int) stored as ORC TBLPROPERTIES ('transactional'='false')");//todo: try with T bucketd + /* + with T non-acid, see HIVE-17505: we get data in subdirs (i.e. union is removed) but it's not moved to T + With T acid, we end up writing each leg to a different delta (by suffix) though this may be due to transient changes in HIVE-15899.03.patch + */ + runStatementOnDriver("insert into T(a,b) select a, b from " + TxnCommandsBaseForTests.Table.ACIDTBL + " where a between 1 and 3 group by a, b union all select a, b from " + TxnCommandsBaseForTests.Table.ACIDTBL + " where a between 5 and 7 union all select a, b from " + TxnCommandsBaseForTests.Table.ACIDTBL + " where a >= 9"); + List rs = runStatementOnDriver("select a, b, INPUT__FILE__NAME from T order by a, b, INPUT__FILE__NAME"); + LOG.warn("before converting to acid"); + for(String s : rs) { + LOG.warn(s); } + Assert.assertEquals(5, rs.size()); + //this duplicates data } // Ideally test like this should be a qfile test. However, the explain output from qfile is always // slightly different depending on where the test is run, specifically due to file size estimation @@ -613,6 +664,7 @@ private void setupMapJoin(HiveConf conf) { private List runStatementOnDriver(String stmt, HiveConf conf) throws Exception { Driver driver = new Driver(conf); + driver.setMaxRows(10000); CommandProcessorResponse cpr = driver.run(stmt); if(cpr.getResponseCode() != 0) { throw new RuntimeException(stmt + " failed: " + cpr); diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 4e7c80f184..8450f527e7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -24,7 +24,10 @@ import java.io.PrintStream; import java.io.Serializable; import java.net.InetAddress; +import java.nio.file.Files; import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; @@ -1199,7 +1202,11 @@ private int acquireLocks() { } // Set the transaction id in all of the acid file sinks if (haveAcidWrite()) { - for (FileSinkDesc desc : plan.getAcidSinks()) { + List acidSinks = new ArrayList<>(plan.getAcidSinks()); + //sorting makes tests easier to write since file names and ROW__IDs depend on statementId + //so this makes (file name -> data) mapping stable + acidSinks.sort((FileSinkDesc fsd1, FileSinkDesc fsd2) -> fsd1.getDirName().compareTo(fsd2.getDirName())); + for (FileSinkDesc desc : acidSinks) { desc.setTransactionId(txnMgr.getCurrentTxnId()); //it's possible to have > 1 FileSink writing to the same table/partition //e.g. Merge stmt, multi-insert stmt when mixing DP and SP writes diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index cde2805142..6ce20392ca 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -22,6 +22,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; @@ -269,9 +270,36 @@ public int execute(DriverContext driverContext) { if (lfd != null) { Path targetPath = lfd.getTargetDir(); Path sourcePath = lfd.getSourcePath(); - moveFile(sourcePath, targetPath, lfd.getIsDfsDir()); + if(lfd.getWriteType() == AcidUtils.Operation.INSERT) { + //'targetPath' is table root of un-partitioned table + //'sourcePath' result of 'select ...' part of CTAS statement + //todo: can sourcePath have > 1 file? currently, CTAS is not supported for + //bucketed tables so number of writers should be same as number of files... + assert lfd.getIsDfsDir(); + FileSystem srcFs = sourcePath.getFileSystem(conf); + List newFiles = new ArrayList<>(); + /* for Tez+Union ctas + acid target sourcePath looks like + -ext-10002 + ├── 1 + │   └── 000000_0 + │   ├── _orc_acid_version + │   └── delta_0000000_0000000 + │   └── bucket_00000 + └── 2 + └── 000000_0 + ├── _orc_acid_version + └── delta_0000000_0000000 + └── bucket_00000 + */ + List sources = new ArrayList<>(); + Collections.addAll(sources, srcFs.globStatus(new Path(sourcePath , "[0-9]*"))); + sources.add(srcFs.globStatus(sourcePath)[0]); + Hive.moveAcidFiles(srcFs, sources.toArray(new FileStatus[sources.size()]), targetPath, newFiles); + } + else { + moveFile(sourcePath, targetPath, lfd.getIsDfsDir()); + } } - // Multi-file load is for dynamic partitions when some partitions do not // need to merge and they can simply be moved to the target directory. LoadMultiFilesDesc lmfd = work.getLoadMultiFilesWork(); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index feacdd8b60..f031a6972a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -41,6 +41,8 @@ import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.plan.CreateTableDesc; +import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId; import org.apache.hadoop.hive.shims.ShimLoader; @@ -1142,6 +1144,26 @@ public static boolean isAcidTable(Table table) { return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true"); } + public static boolean isAcidTable(TableDesc table) { + if (table == null || table.getProperties() == null) { + return false; + } + String tableIsTransactional = table.getProperties().getProperty(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL); + if (tableIsTransactional == null) { + tableIsTransactional = table.getProperties().getProperty(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.toUpperCase()); + } + return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true"); + } + public static boolean isAcidTable(CreateTableDesc table) { + if (table == null || table.getTblProps() == null) { + return false; + } + String tableIsTransactional = table.getTblProps().get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL); + if (tableIsTransactional == null) { + tableIsTransactional = table.getTblProps().get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.toUpperCase()); + } + return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true"); + } /** * Sets the acidOperationalProperties in the configuration object argument. diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index aa44c62d88..e6fdcbfb4b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -3464,18 +3464,39 @@ static protected void copyFiles(HiveConf conf, Path srcf, Path destf, // If we're moving files around for an ACID write then the rules and paths are all different. // You can blame this on Owen. if (isAcid) { + List sources = new ArrayList<>(); + Path pathPattern = new Path(srcf, "[0-9]*");//todo: make patter so that it's at least 1 digit? otherwise it seems to pick up foo/0000_0 and foo/00001_0 when using foo/[0-9]*... + try { + /** + * {@link HiveConf.ConfVars.HIVE_OPTIMIZE_UNION_REMOVE}=true can create a different data + * layout in the table/parition dir: + * T/1/ + * T/2/ + * T/3/.... etc + */ + Collections.addAll(sources, srcFs.globStatus(pathPattern)); + if(!sources.isEmpty()) { + Collections.addAll(sources, srcs); + srcs = sources.toArray(new FileStatus[sources.size()]); + } + } + catch(IOException ex) { + LOG.error("Unable to glob nested dirs: " + pathPattern); + } moveAcidFiles(srcFs, srcs, destf, newFiles); } else { copyFiles(conf, fs, srcs, srcFs, destf, isSrcLocal, newFiles); } } - private static void moveAcidFiles(FileSystem fs, FileStatus[] stats, Path dst, + public static void moveAcidFiles(FileSystem fs, FileStatus[] stats, Path dst, List newFiles) throws HiveException { // The layout for ACID files is table|partname/base|delta|delete_delta/bucket - // We will always only be writing delta files. In the buckets created by FileSinkOperator - // it will look like bucket/delta|delete_delta/bucket. So we need to move that into - // the above structure. For the first mover there will be no delta directory, + // We will always only be writing delta files ( except IOW which writes base_X/ ). + // In the buckets created by FileSinkOperator + // it will look like original_bucket/delta|delete_delta/bucket + // (e.g. .../-ext-10004/000000_0/delta_0000014_0000014_0000/bucket_00000). So we need to + // move that into the above structure. For the first mover there will be no delta directory, // so we can move the whole directory. // For everyone else we will need to just move the buckets under the existing delta // directory. @@ -3503,7 +3524,7 @@ private static void moveAcidFiles(FileSystem fs, FileStatus[] stats, Path dst, fs, dst, origBucketPath, createdDeltaDirs, newFiles); moveAcidFiles(AcidUtils.DELETE_DELTA_PREFIX, AcidUtils.deleteEventDeltaDirFilter, fs, dst,origBucketPath, createdDeltaDirs, newFiles); - moveAcidFiles(AcidUtils.BASE_PREFIX, AcidUtils.baseFileFilter, + moveAcidFiles(AcidUtils.BASE_PREFIX, AcidUtils.baseFileFilter,//for Insert Overwrite fs, dst, origBucketPath, createdDeltaDirs, newFiles); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/QueryPlanPostProcessor.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/QueryPlanPostProcessor.java new file mode 100644 index 0000000000..a90b04a8e8 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/QueryPlanPostProcessor.java @@ -0,0 +1,111 @@ +package org.apache.hadoop.hive.ql.optimizer; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; +import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.lib.Rule; +import org.apache.hadoop.hive.ql.lib.RuleRegExp; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.FileSinkDesc; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Stack; + +/** + * This runs rules after the final plan is generated. This should not make modifications to + * the plan. + */ +public class QueryPlanPostProcessor { + private ParseContext pctx; + private List transformations = new ArrayList<>(); + private static final Logger LOG = LoggerFactory.getLogger(QueryPlanPostProcessor.class); + + public QueryPlanPostProcessor(ParseContext pCtx) { + this.pctx = pCtx; + } + public void initialize(HiveConf hiveConf) { + transformations.add(new FileSinkDescCollector()); + } + public ParseContext optimize() throws SemanticException { + for (Transform t : transformations) { + t.beginPerfLogging(); + pctx = t.transform(pctx); + t.endPerfLogging(t.toString()); + } + return pctx; + } + private static class FileSinkDescCollector extends Transform { + @Override + public ParseContext transform(ParseContext pctx) throws SemanticException { + Map opRules = new LinkedHashMap(); + + String FS = FileSinkOperator.getOperatorName() + "%"; + + opRules.put(new RuleRegExp("FileSinkDesc Collector", FS), new MyNodeProcessor(pctx)); + + Dispatcher disp = new DefaultRuleDispatcher(null, opRules, null); + GraphWalker ogw = new DefaultGraphWalker(disp); + + ArrayList topNodes = new ArrayList(); + topNodes.addAll(pctx.getTopOps().values()); + ogw.startWalking(topNodes, null); + + return pctx; + } + private class MyNodeProcessor implements NodeProcessor { + + private final Logger LOG = LoggerFactory.getLogger(MyNodeProcessor.class); + protected ParseContext parseCtx; + + private MyNodeProcessor(ParseContext pCtx) { + this.parseCtx = pCtx; + } + + /** + * There are optimizations like removing the UnionAll operator that create new FileSinkOperators + * @param nd + * operator to process + * @param stack + * @param procCtx + * operator processor context + * @param nodeOutputs + * A variable argument list of outputs from other nodes in the walk + * @return + * @throws SemanticException + */ + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + FileSinkOperator fsOp = (FileSinkOperator) nd; + if(fsOp.getConf().getWriteType() != AcidUtils.Operation.NOT_ACID) { + parseCtx.getAcidSinks().add(fsOp.getConf()); + } + if(fsOp.getConf().isLinkedFileSink()) { + /* + There is something fishy here: Consider TestTxnNoBuckets.testUnionRemove() + process() does not visit all the FileSinkOperatorS in the plan somehow + The plan must be disconnected? todo: print it out in trasnform()*/ + for(FileSinkDesc fsd : fsOp.getConf().getLinkedFileSinkDesc()) { + if(fsd.getWriteType() != AcidUtils.Operation.NOT_ACID) { + parseCtx.getAcidSinks().add(fsd); + } + } + } + return null; + } + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java index 2a7f3d4f03..3bc17f87e3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java @@ -32,11 +32,13 @@ import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext.UnionParseContext; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.session.SessionState; /** * Operator factory for union processing. @@ -217,6 +219,9 @@ private void pushOperatorsAboveUnion(UnionOperator union, // each parent List fileDescLists = new ArrayList(); + SessionState ss = SessionState.get(); + HiveTxnManager txnMgr = ss.getTxnMgr();//todo: this doesn't get called... for tez see GenTezUtils + for (Operator parent : parents) { FileSinkDesc fileSinkDesc = (FileSinkDesc) fileSinkOp.getConf().clone(); fileSinkDesc.setDirName(new Path(parentDirName, parent.getIdentifier())); @@ -227,12 +232,19 @@ private void pushOperatorsAboveUnion(UnionOperator union, OperatorFactory.getAndMakeChild(fileSinkDesc, parent.getSchema(), parent); tmpFileSinkOp.setChildOperators(null); fileDescLists.add(fileSinkDesc); + if(txnMgr != null && false) { + //assert txnMgr.isTxnOpen();//seem threadLocal is inherited from tests testing txns to others + if(txnMgr.isTxnOpen()) {//todo: this is kind of a hack since these FileSinks don't get added to QueryPlan acid sinks and the original FS.linkeFileSinkDesc is not set + fileSinkDesc.setStatementId(txnMgr.getWriteIdAndIncrement()); + fileSinkDesc.setTransactionId(txnMgr.getCurrentTxnId()); + } + } } for (FileSinkDesc fileDesc : fileDescLists) { fileDesc.setLinkedFileSinkDesc(fileDescLists); } - + //todo: should this also do fileSinkOp.getConf().setLinkedFileSinkDesc(fileDescLists); // delink union union.setChildOperators(null); union.setParentOperators(null); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java index 1b0a2f0661..5d6efec021 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java @@ -38,10 +38,13 @@ import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.UnionOperator; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.lib.*; +import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; import org.apache.hadoop.hive.ql.plan.*; import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBetween; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFInBloomFilter; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; @@ -314,7 +317,12 @@ public static void removeUnionOperators(GenTezProcContext context, BaseWork work linked = context.linkedFileSinks.get(path); linked.add(desc); - desc.setDirName(new Path(path, "" + linked.size())); + desc.setDirName(new Path(path, "" + linked.size()));//todo: so this is where we effectively assign the ID to the branch + if(desc.getWriteType() != AcidUtils.Operation.NOT_ACID) { + if(!context.parseContext.getAcidSinks().contains(desc)) { + //context.parseContext.getAcidSinks().add(desc);//this is hit, but seems to be operating on a clone of some context + } + } desc.setLinkedFileSink(true); desc.setParentDir(path); desc.setLinkedFileSinkDesc(linked); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 1c74779dec..961cea955a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -131,6 +131,7 @@ import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.optimizer.Optimizer; +import org.apache.hadoop.hive.ql.optimizer.QueryPlanPostProcessor; import org.apache.hadoop.hive.ql.optimizer.Transform; import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException; import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException.UnsupportedFeature; @@ -7129,6 +7130,8 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) viewDesc.setSchema(new ArrayList(field_schemas)); } + destTableIsAcid = tblDesc != null && AcidUtils.isAcidTable(tblDesc); + boolean isDestTempFile = true; if (!ctx.isMRTmpFileURI(dest_path.toUri().toString())) { idToTableNameMap.put(String.valueOf(destTableId), dest_path.toUri().toString()); @@ -7139,8 +7142,8 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) boolean isDfsDir = (dest_type.intValue() == QBMetaData.DEST_DFS_FILE); loadFileWork.add(new LoadFileDesc(tblDesc, viewDesc, queryTmpdir, dest_path, isDfsDir, cols, - colTypes)); - + colTypes, destTableIsAcid ? Operation.INSERT : Operation.NOT_ACID)); +//todo: what about WriteEntity.writeType? does it matter here? We could lock the DB of the new table so that it doesn't get dropped.... if (tblDesc == null) { if (viewDesc != null) { table_desc = PlanUtils.getTableDesc(viewDesc, cols, colTypes); @@ -7171,7 +7174,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) } else { table_desc = PlanUtils.getTableDesc(tblDesc, cols, colTypes); } - +//todo: should this WriteEntity have writeType for locks acquisition? if (!outputs.add(new WriteEntity(dest_path, !isDfsDir, isDestTempFile))) { throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES .getMsg(dest_path.toUri().toString())); @@ -7210,7 +7213,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) } RowSchema fsRS = new RowSchema(vecCol); - +//todo: what does this mean? concat? // The output files of a FileSink can be merged if they are either not being written to a table // or are being written to a table which is not bucketed // and table the table is not sorted @@ -7248,13 +7251,23 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) (deleting(dest) ? AcidUtils.Operation.DELETE : AcidUtils.Operation.INSERT); fileSinkDesc.setWriteType(wt); - String destTableFullName = dest_tab.getCompleteName().replace('@', '.'); - Map iowMap = qb.getParseInfo().getInsertOverwriteTables(); - if (iowMap.containsKey(destTableFullName)) { - fileSinkDesc.setInsertOverwrite(true); + if(dest_type.intValue() == QBMetaData.DEST_PARTITION || dest_type.intValue() == QBMetaData.DEST_TABLE) { + String destTableFullName = dest_tab.getCompleteName().replace('@', '.'); + Map iowMap = qb.getParseInfo().getInsertOverwriteTables(); + if (iowMap.containsKey(destTableFullName)) { + fileSinkDesc.setInsertOverwrite(true); + } } acidFileSinks.add(fileSinkDesc); } + if(false && AcidUtils.isAcidTable(table_desc)) { + //todo: this must be CTAS - assert it + //so this makes it use OrcRecordUpdater and write /warehouse/.hive-staging_hive_2017-09-06_14-01-47_464_6778553204482671696-1/_task_tmp.-ext-10004/_tmp.000000_0/delta_0000000_0000000/bucket_00000 + //but the final table is empty.... +// checkAcidConstraints(qb, table_desc, dest_tab);//no dest_tab here so NPE + fileSinkDesc.setWriteType(Operation.INSERT); + acidFileSinks.add(fileSinkDesc); + } fileSinkDesc.setTemporary(destTableIsTemporary); fileSinkDesc.setMaterialization(destTableIsMaterialization); @@ -11497,6 +11510,10 @@ void analyzeInternal(ASTNode ast, PlannerContext plannerCtx) throws SemanticExce compiler.compile(pCtx, rootTasks, inputs, outputs); fetchTask = pCtx.getFetchTask(); } + QueryPlanPostProcessor qppp = new QueryPlanPostProcessor(pCtx); + qppp.initialize(conf); + qppp.optimize(); + getAcidFileSinks().addAll(pCtx.getAcidSinks()); LOG.info("Completed plan generation"); // 10. put accessed columns to readEntity diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index 15836ecf48..7782bc000a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -559,6 +559,7 @@ protected void generateTaskTree(List> rootTasks, Pa for (BaseWork w: procCtx.workWithUnionOperators) { GenTezUtils.removeUnionOperators(procCtx, w, indexForTezUnion++); } + pCtx.getAcidSinks().addAll(procCtx.parseContext.getAcidSinks()); // then we make sure the file sink operators are set up right for (FileSinkOperator fileSink: procCtx.fileSinkSet) { diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java index 03202fb7f5..d451413237 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java @@ -21,7 +21,7 @@ import java.io.Serializable; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.exec.PTFUtils; +import org.apache.hadoop.hive.ql.io.AcidUtils; /** * LoadFileDesc. @@ -36,6 +36,14 @@ private String columnTypes; private String destinationCreateTable; + //todo: push up to parent? (and LoadTableDesc) + // Need to remember whether this is an acid compliant operation, and if so whether it is an + // insert, update, or delete. + private AcidUtils.Operation writeType = AcidUtils.Operation.NOT_ACID; + public AcidUtils.Operation getWriteType() { + return writeType; + } + public LoadFileDesc() { } @@ -51,7 +59,7 @@ public LoadFileDesc(final LoadFileDesc o) { public LoadFileDesc(final CreateTableDesc createTableDesc, final CreateViewDesc createViewDesc, final Path sourcePath, final Path targetDir, final boolean isDfsDir, - final String columns, final String columnTypes) { + final String columns, final String columnTypes, AcidUtils.Operation writeType) { this(sourcePath, targetDir, isDfsDir, columns, columnTypes); if (createTableDesc != null && createTableDesc.getDatabaseName() != null && createTableDesc.getTableName() != null) { @@ -63,6 +71,7 @@ public LoadFileDesc(final CreateTableDesc createTableDesc, final CreateViewDesc // qualified. destinationCreateTable = createViewDesc.getViewName(); } + this.writeType = writeType; } public LoadFileDesc(final Path sourcePath, final Path targetDir, diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java index b695f0f809..bbed9be82e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java @@ -22,12 +22,12 @@ public class TezEdgeProperty { - public enum EdgeType { - SIMPLE_EDGE, + public enum EdgeType {//todo: HIVE-15549 + SIMPLE_EDGE,//SORT_PARTITION_EDGE BROADCAST_EDGE, - CONTAINS, - CUSTOM_EDGE, - CUSTOM_SIMPLE_EDGE, + CONTAINS,//used for union (all?) + CUSTOM_EDGE,//CO_PARTITION_EDGE + CUSTOM_SIMPLE_EDGE,//PARTITION_EDGE ONE_TO_ONE_EDGE } diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java index 1f0c269f5e..ef1b84371b 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java @@ -154,7 +154,7 @@ public void testNoBuckets() throws Exception { /** * all of these pass but don't do exactly the right thing * files land as if it's not an acid table "warehouse/myctas4/000000_0" - * even though in {@link org.apache.hadoop.hive.metastore.TransactionalValidationListener} fires + * even though {@link org.apache.hadoop.hive.metastore.TransactionalValidationListener} fires * and sees it as transactional table * look for QB.isCTAS() and CreateTableDesc() in SemanticAnalyzer * @@ -169,6 +169,7 @@ public void testCTAS() throws Exception { runStatementOnDriver("insert into " + Table.NONACIDORCTBL + makeValuesClause(values)); runStatementOnDriver("create table myctas stored as ORC TBLPROPERTIES ('transactional" + "'='true', 'transactional_properties'='default') as select a, b from " + Table.NONACIDORCTBL); + //todo: look at the files - make sure CTAS did an Acid write List rs = runStatementOnDriver("select * from myctas order by a, b"); Assert.assertEquals(stringifyValues(values), rs); @@ -190,9 +191,84 @@ public void testCTAS() throws Exception { rs = runStatementOnDriver("select * from myctas4 order by a, b"); Assert.assertEquals(stringifyValues(values), rs); } + + /** + * I suspect the reason that when this is done over non acid T, there is some merge logic that runs + * at the end and generates a new output dir but the 'move' logic doesn't know it. + * Acid disables the merge logic. + * See SemanticAnalyzer.genFileSink() around "canBeMerged" - maybe related or something similar elsewhere + * @throws Exception + */ + @Test + public void testUnionRemove() throws Exception {//todo HIVE-17505 + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_OPTIMIZE_UNION_REMOVE, true); + hiveConf.setVar(HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none"); + d.close(); + d = new Driver(hiveConf); + int[][] values = {{1,2},{3,4},{5,6},{7,8},{9,10}}; + runStatementOnDriver("insert into " + TxnCommandsBaseForTests.Table.ACIDTBL + makeValuesClause(values));//HIVE-17138: this creates 1 delta_0000013_0000013_0000/bucket_00001 + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("create table T (a int, b int) stored as ORC TBLPROPERTIES ('transactional'='true')");//todo: try with T bucketd + /* + with T non-acid, see HIVE-17505: we get data in subdirs (i.e. union is removed) but it's not moved to T + With T acid, we end up writing each leg to a different delta (by suffix) though this may be due to transient changes in HIVE-15899.03.patch + */ + runStatementOnDriver("insert into T(a,b) select a, b from " + TxnCommandsBaseForTests.Table.ACIDTBL + " where a between 1 and 3 group by a, b union all select a, b from " + TxnCommandsBaseForTests.Table.ACIDTBL + " where a between 5 and 7 union all select a, b from " + TxnCommandsBaseForTests.Table.ACIDTBL + " where a >= 9"); + List rs = runStatementOnDriver("select a, b, INPUT__FILE__NAME from T order by a, b, INPUT__FILE__NAME"); + LOG.warn("before converting to acid"); + for(String s : rs) { + LOG.warn(s); + } + //todo: test file names, etc + rs = runStatementOnDriver("select * from T order by a, b"); + Assert.assertEquals("", TestTxnCommands2.stringifyValues(values), rs); + } + + /** + * This tests converting unbucketed non acid table to acid where the starting state was crated + * by multiple concurrent writers: + * T + *   ├── 000000_0 + *   └── 000001_0 + * as opposed to serial writers (sequential insert statements) + * T + *   ├── 000000_0 + *   └── 000000_0_copy_1 + */ + @Test + public void testToAcidConversionMultiBucket() throws Exception {//todo HIVE-17505 + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_OPTIMIZE_UNION_REMOVE, true); + hiveConf.setVar(HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none"); + d.close(); + d = new Driver(hiveConf); + int[][] values = {{1,2},{3,4},{5,6},{7,8},{9,10}}; + runStatementOnDriver("insert into " + Table.ACIDTBL + makeValuesClause(values)); + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("create table T (a int, b int) stored as ORC TBLPROPERTIES ('transactional'='false')");//todo: try with T bucketd + /* + with T non-acid, see HIVE-17505: we get data in subdirs (i.e. union is removed) but it's not moved to T + With T acid, we end up writing each leg to a different delta (by suffix) though this may be due to transient changes in HIVE-15899.03.patch + */ + runStatementOnDriver("insert into T(a,b) select a, b from " + Table.ACIDTBL + " where a between 1 and 3 group by a, b union all select a, b from " + Table.ACIDTBL + " where a between 5 and 7 union all select a, b from " + Table.ACIDTBL + " where a >= 9"); + List rs = runStatementOnDriver("select a, b, INPUT__FILE__NAME from T order by a, b, INPUT__FILE__NAME"); + LOG.warn("before converting to acid"); + for(String s : rs) { + LOG.warn(s); + } + } /** * see HIVE-16177 - * See also {@link TestTxnCommands2#testNonAcidToAcidConversion02()} todo need test with > 1 bucket file + * See also {@link TestTxnCommands2#testNonAcidToAcidConversion02()} + * todo need test with > 1 bucket file + * easiest to use union all (foo is non acid) + * runStatementOnDriver("insert into foo select a,b from (" + "select a, b, INPUT__FILE__NAME from " + Table.ACIDTBL + " where a <= 5 union all select a, b, INPUT__FILE__NAME from " + Table.NONACIDORCTBL + " where a >= 5" + ") S order by a, b"); + * this creates + * foo + *   ├── 000000_0 + *   └── 000001_0 + * layout + * + * what does this do for acid? delta_x_x_0 and delta_x_x_1? */ @Test public void testToAcidConversion02() throws Exception {