diff --git pom.xml pom.xml index 376197e..603d98b 100644 --- pom.xml +++ pom.xml @@ -101,7 +101,7 @@ 2.4 2.4 2.4.3 - 2.19.1 + 2.18.1 2.4 2.8 2.9 diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java index e2363eb..0da0c25 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java @@ -140,6 +140,11 @@ public static ReduceWork createReduceWork( TezEdgeProperty edgeProp; EdgeType edgeType = determineEdgeType(context.preceedingWork, reduceWork); + if(edgeType != EdgeType.CUSTOM_SIMPLE_EDGE) { + if(reduceSink.getConf().getKeyCols().isEmpty()) { + edgeType = EdgeType.CUSTOM_SIMPLE_EDGE; + } + } if (reduceWork.isAutoReduceParallelism()) { edgeProp = new TezEdgeProperty(context.conf, edgeType, true, diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index cdb9e1b..01d622b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -514,6 +514,13 @@ protected void optimizeTaskPlan(List> rootTasks, Pa } else { LOG.debug("Skipping llap decider"); } + for(Task t : rootTasks) { + if(!(t instanceof TezTask)) continue; + TezWork tezWork = ((TezTask)t).getWork(); + if(tezWork == null) continue; + //tezWork.optimzeEdges(); + } + // This optimizer will serialize all filters that made it to the // table scan operator to avoid having to do it multiple times on diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java index 7a70e6b..39fd7e7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java @@ -81,6 +81,23 @@ public static boolean isCustomInputType(VertexType vertex) { new HashMap, TezEdgeProperty>(); private final Map workVertexTypeMap = new HashMap(); + public void optimzeEdges() { + for(Map.Entry, TezEdgeProperty> ent : edgeProperties.entrySet()) { + if(ent.getKey().getRight() instanceof ReduceWork) { + ReduceWork reduceWork = (ReduceWork)ent.getKey().getRight(); + if(!reduceWork.getSortCols().isEmpty()) { + continue; + } + TezEdgeProperty e = ent.getValue(); + if(e.getEdgeType() == EdgeType.SIMPLE_EDGE) { + TezEdgeProperty newEdge = new TezEdgeProperty(e.getHiveConf(), EdgeType.CUSTOM_SIMPLE_EDGE, + e.isAutoReduce(), e.getMinReducer(), e.getMaxReducer(), e.getInputSizePerReducer()); + ent.setValue(newEdge); + } + } + } + } + public TezWork(String queryId) { this(queryId, null); } diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java index 9bfcc82..b451567 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java @@ -737,15 +737,25 @@ public void testMergeType2SCD02() throws Exception { Assert.assertEquals(stringifyValues(resultVals), r); } - @Ignore("HIVE-14707") +// @Ignore("HIVE-14707") @Test public void testMergeInsertOnly() throws Exception { String query = "merge into " + Table.ACIDTBL + " as t using " + Table.NONACIDORCTBL + " s ON t.a = s.a " + + "WHEN MATCHED THEN DELETE " + "WHEN NOT MATCHED THEN INSERT VALUES(s.a, s.b) "; + +// merge into acidTbl as t using nonAcidOrcTbl s ON t.a = s.a +// WHEN MATCHED and t.a > 3 and t.a < 5 THEN DELETE WHEN NOT MATCHED THEN INSERT VALUES(s.a, s.b) + +// runStatementOnDriver("CREATE TABLE src (key STRING COMMENT 'default', value STRING COMMENT 'default') STORED AS TEXTFILE"); +// query = "select key, count(key) from src group by key"; +// query = "analyze select key from src where key < 10;"; + d.destroy(); HiveConf hc = new HiveConf(hiveConf); hc.setVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "tez"); + hc.setBoolVar(HiveConf.ConfVars.HIVE_EXPLAIN_USER, false); d = new Driver(hc); d.setMaxRows(10000); @@ -757,6 +767,10 @@ public void testMergeInsertOnly() throws Exception { LOG.info("Explain1: " + sb); } @Test + public void testSortEdge() { + + } + @Test public void testMergeUpdateDelete() throws Exception { int[][] baseValsOdd = {{2,2},{4,44},{5,5},{11,11}}; runStatementOnDriver("insert into " + Table.NONACIDORCTBL + " " + makeValuesClause(baseValsOdd));