diff --git data/files/kv6.txt data/files/kv6.txt new file mode 100644 index 0000000..63de5c5 --- /dev/null +++ data/files/kv6.txt @@ -0,0 +1,100 @@ +00 +01 +02 +03 +04 +05 +06 +07 +08 +09 +010 +011 +012 +013 +014 +015 +016 +017 +018 +019 +020 +021 +022 +023 +024 +025 +026 +027 +028 +029 +030 +031 +032 +033 +034 +035 +036 +037 +038 +039 +040 +041 +042 +043 +044 +045 +046 +047 +048 +049 +10 +11 +12 +13 +14 +15 +16 +17 +18 +19 +110 +111 +112 +113 +114 +115 +116 +117 +118 +119 +120 +121 +122 +123 +124 +125 +126 +127 +128 +129 +130 +131 +132 +133 +134 +135 +136 +137 +138 +139 +140 +141 +142 +143 +144 +145 +146 +147 +148 +149 diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java index 2b955d3..8eeccf1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java @@ -140,6 +140,9 @@ public abstract class CommonJoinOperator extends transient boolean handleSkewJoin = false; + protected transient int countAfterReport; + protected transient int heartbeatInterval; + public CommonJoinOperator() { } @@ -266,6 +269,11 @@ public abstract class CommonJoinOperator extends protected void initializeOp(Configuration hconf) throws HiveException { this.handleSkewJoin = conf.getHandleSkewJoin(); this.hconf = hconf; + + heartbeatInterval = HiveConf.getIntVar(hconf, + HiveConf.ConfVars.HIVESENDHEARTBEAT); + countAfterReport = 0; + totalSz = 0; // Map that contains the rows for each alias storage = new HashMap>>(); @@ -816,6 +824,7 @@ public abstract class CommonJoinOperator extends } forward(forwardCache, outputObjInspector); + countAfterReport = 0; return; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java index 59788c2..6d57ee5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java @@ -151,6 +151,8 @@ public class GroupByOperator extends Operator implements transient int totalVariableSize; transient int numEntriesVarSize; transient int numEntriesHashTable; + transient int countAfterReport; + transient int heartbeatInterval; @Override protected void initializeOp(Configuration hconf) throws HiveException { @@ -158,6 +160,10 @@ public class GroupByOperator extends Operator implements numRowsInput = 0; numRowsHashTbl = 0; + heartbeatInterval = HiveConf.getIntVar(hconf, + HiveConf.ConfVars.HIVESENDHEARTBEAT); + countAfterReport = 0; + assert (inputObjInspectors.length == 1); ObjectInspector rowInspector = inputObjInspectors[0]; @@ -291,7 +297,7 @@ public class GroupByOperator extends Operator implements * the total amount of memory to be used by the map-side hash. By default, all * available memory is used. The size of each row is estimated, rather * crudely, and the number of entries are figure out based on that. - * + * * @return number of entries that can fit in hash table - useful for map-side * aggregation only **/ @@ -311,7 +317,7 @@ public class GroupByOperator extends Operator implements * datatype is of variable length, STRING, a list of such key positions is * maintained, and the size for such positions is then actually calculated at * runtime. - * + * * @param pos * the position of the key * @param c @@ -342,7 +348,7 @@ public class GroupByOperator extends Operator implements * field is of variable length, STRING, a list of such field names for the * field position is maintained, and the size for such positions is then * actually calculated at runtime. - * + * * @param pos * the position of the key * @param c @@ -454,15 +460,15 @@ public class GroupByOperator extends Operator implements * whether it has changed. As a cleanup, the lastInvoke logic can be pushed in * the caller, and this function can be independent of that. The client should * always notify whether it is a different row or not. - * + * * @param aggs the aggregations to be evaluated - * + * * @param row the row being processed - * + * * @param rowInspector the inspector for the row - * + * * @param hashAggr whether hash aggregation is being performed or not - * + * * @param newEntryForHashAggr only valid if it is a hash aggregation, whether * it is a new entry or not */ @@ -545,6 +551,8 @@ public class GroupByOperator extends Operator implements } try { + countAfterReport++; + // Compute the keys newKeys.clear(); for (int i = 0; i < keyFields.length; i++) { @@ -562,6 +570,12 @@ public class GroupByOperator extends Operator implements } firstRowInGroup = false; + + if (countAfterReport != 0 && (countAfterReport % heartbeatInterval) == 0 + && (reporter != null)) { + reporter.progress(); + countAfterReport = 0; + } } catch (HiveException e) { throw e; } catch (Exception e) { @@ -695,6 +709,7 @@ public class GroupByOperator extends Operator implements // Forward the current keys if needed for sort-based aggregation if (currentKeys != null && !keysAreEqual) { forward(currentKeys, aggregations); + countAfterReport = 0; } // Need to update the keys? @@ -724,7 +739,7 @@ public class GroupByOperator extends Operator implements /** * Based on user-parameters, should the hash table be flushed. - * + * * @param newKeys * keys for the row under consideration **/ @@ -809,6 +824,7 @@ public class GroupByOperator extends Operator implements forward(m.getKey().keys, m.getValue()); iter.remove(); numDel++; + countAfterReport = 0; if (numDel * 10 >= oldSize) { LOG.warn("Hash Table flushed: new size = " + hashAggregations.size()); return; @@ -820,7 +836,7 @@ public class GroupByOperator extends Operator implements /** * Forward a record of keys and aggregation results. - * + * * @param keys * The keys in the record * @throws HiveException @@ -843,7 +859,7 @@ public class GroupByOperator extends Operator implements /** * We need to forward all the aggregations to children. - * + * */ @Override public void closeOp(boolean abort) throws HiveException { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java index d89a7f8..b5a3eb1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java @@ -68,6 +68,7 @@ public class JoinOperator extends CommonJoinOperator implements @Override public void processOp(Object row, int tag) throws HiveException { try { + countAfterReport++; // get alias alias = (byte) tag; @@ -117,6 +118,12 @@ public class JoinOperator extends CommonJoinOperator implements // Add the value to the vector storage.get(alias).add(nr); + if (countAfterReport != 0 && (countAfterReport % heartbeatInterval) == 0 + && (reporter != null)) { + reporter.progress(); + countAfterReport = 0; + } + } catch (Exception e) { e.printStackTrace(); throw new HiveException(e); diff --git ql/src/test/queries/clientpositive/progress_1.q ql/src/test/queries/clientpositive/progress_1.q new file mode 100644 index 0000000..06051cb --- /dev/null +++ ql/src/test/queries/clientpositive/progress_1.q @@ -0,0 +1,9 @@ +set hive.heartbeat.interval=5; + +DROP TABLE PROGRESS_1; +CREATE TABLE PROGRESS_1(key int, value string) STORED AS TEXTFILE; +LOAD DATA LOCAL INPATH '../data/files/kv6.txt' INTO TABLE PROGRESS_1; + +select count(1) from PROGRESS_1 t1 join PROGRESS_1 t2 on t1.key=t2.key; + +DROP TABLE PROGRESS_1; diff --git ql/src/test/results/clientpositive/progress_1.q.out ql/src/test/results/clientpositive/progress_1.q.out new file mode 100644 index 0000000..b8cbf30 --- /dev/null +++ ql/src/test/results/clientpositive/progress_1.q.out @@ -0,0 +1,28 @@ +PREHOOK: query: DROP TABLE PROGRESS_1 +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE PROGRESS_1 +POSTHOOK: type: DROPTABLE +PREHOOK: query: CREATE TABLE PROGRESS_1(key int, value string) STORED AS TEXTFILE +PREHOOK: type: CREATETABLE +POSTHOOK: query: CREATE TABLE PROGRESS_1(key int, value string) STORED AS TEXTFILE +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@PROGRESS_1 +PREHOOK: query: LOAD DATA LOCAL INPATH '../data/files/kv6.txt' INTO TABLE PROGRESS_1 +PREHOOK: type: LOAD +POSTHOOK: query: LOAD DATA LOCAL INPATH '../data/files/kv6.txt' INTO TABLE PROGRESS_1 +POSTHOOK: type: LOAD +POSTHOOK: Output: default@progress_1 +PREHOOK: query: select count(1) from PROGRESS_1 t1 join PROGRESS_1 t2 on t1.key=t2.key +PREHOOK: type: QUERY +PREHOOK: Input: default@progress_1 +PREHOOK: Output: file:/tmp/sdong/hive_2010-07-08_16-37-28_683_2518541288555731352/10000 +POSTHOOK: query: select count(1) from PROGRESS_1 t1 join PROGRESS_1 t2 on t1.key=t2.key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@progress_1 +POSTHOOK: Output: file:/tmp/sdong/hive_2010-07-08_16-37-28_683_2518541288555731352/10000 +5000 +PREHOOK: query: DROP TABLE PROGRESS_1 +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE PROGRESS_1 +POSTHOOK: type: DROPTABLE +POSTHOOK: Output: default@progress_1 -- 1.6.0.4.609.g474fc