diff --git pom.xml pom.xml
index 376197e..603d98b 100644
--- pom.xml
+++ pom.xml
@@ -101,7 +101,7 @@
2.4
2.4
2.4.3
- 2.19.1
+ 2.18.1
2.4
2.8
2.9
diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
index e2363eb..0da0c25 100644
--- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
+++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
@@ -140,6 +140,11 @@ public static ReduceWork createReduceWork(
TezEdgeProperty edgeProp;
EdgeType edgeType = determineEdgeType(context.preceedingWork, reduceWork);
+ if(edgeType != EdgeType.CUSTOM_SIMPLE_EDGE) {
+ if(reduceSink.getConf().getKeyCols().isEmpty()) {
+ edgeType = EdgeType.CUSTOM_SIMPLE_EDGE;
+ }
+ }
if (reduceWork.isAutoReduceParallelism()) {
edgeProp =
new TezEdgeProperty(context.conf, edgeType, true,
diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
index cdb9e1b..01d622b 100644
--- ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
+++ ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
@@ -514,6 +514,13 @@ protected void optimizeTaskPlan(List> rootTasks, Pa
} else {
LOG.debug("Skipping llap decider");
}
+ for(Task t : rootTasks) {
+ if(!(t instanceof TezTask)) continue;
+ TezWork tezWork = ((TezTask)t).getWork();
+ if(tezWork == null) continue;
+ //tezWork.optimzeEdges();
+ }
+
// This optimizer will serialize all filters that made it to the
// table scan operator to avoid having to do it multiple times on
diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java
index 7a70e6b..39fd7e7 100644
--- ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java
+++ ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java
@@ -81,6 +81,23 @@ public static boolean isCustomInputType(VertexType vertex) {
new HashMap, TezEdgeProperty>();
private final Map workVertexTypeMap = new HashMap();
+ public void optimzeEdges() {
+ for(Map.Entry, TezEdgeProperty> ent : edgeProperties.entrySet()) {
+ if(ent.getKey().getRight() instanceof ReduceWork) {
+ ReduceWork reduceWork = (ReduceWork)ent.getKey().getRight();
+ if(!reduceWork.getSortCols().isEmpty()) {
+ continue;
+ }
+ TezEdgeProperty e = ent.getValue();
+ if(e.getEdgeType() == EdgeType.SIMPLE_EDGE) {
+ TezEdgeProperty newEdge = new TezEdgeProperty(e.getHiveConf(), EdgeType.CUSTOM_SIMPLE_EDGE,
+ e.isAutoReduce(), e.getMinReducer(), e.getMaxReducer(), e.getInputSizePerReducer());
+ ent.setValue(newEdge);
+ }
+ }
+ }
+ }
+
public TezWork(String queryId) {
this(queryId, null);
}
diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
index 9bfcc82..b451567 100644
--- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
+++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
@@ -737,15 +737,25 @@ public void testMergeType2SCD02() throws Exception {
Assert.assertEquals(stringifyValues(resultVals), r);
}
- @Ignore("HIVE-14707")
+// @Ignore("HIVE-14707")
@Test
public void testMergeInsertOnly() throws Exception {
String query = "merge into " + Table.ACIDTBL +
" as t using " + Table.NONACIDORCTBL + " s ON t.a = s.a " +
+ "WHEN MATCHED THEN DELETE " +
"WHEN NOT MATCHED THEN INSERT VALUES(s.a, s.b) ";
+
+// merge into acidTbl as t using nonAcidOrcTbl s ON t.a = s.a
+// WHEN MATCHED and t.a > 3 and t.a < 5 THEN DELETE WHEN NOT MATCHED THEN INSERT VALUES(s.a, s.b)
+
+// runStatementOnDriver("CREATE TABLE src (key STRING COMMENT 'default', value STRING COMMENT 'default') STORED AS TEXTFILE");
+// query = "select key, count(key) from src group by key";
+// query = "analyze select key from src where key < 10;";
+
d.destroy();
HiveConf hc = new HiveConf(hiveConf);
hc.setVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "tez");
+ hc.setBoolVar(HiveConf.ConfVars.HIVE_EXPLAIN_USER, false);
d = new Driver(hc);
d.setMaxRows(10000);
@@ -757,6 +767,10 @@ public void testMergeInsertOnly() throws Exception {
LOG.info("Explain1: " + sb);
}
@Test
+ public void testSortEdge() {
+
+ }
+ @Test
public void testMergeUpdateDelete() throws Exception {
int[][] baseValsOdd = {{2,2},{4,44},{5,5},{11,11}};
runStatementOnDriver("insert into " + Table.NONACIDORCTBL + " " + makeValuesClause(baseValsOdd));