diff --git data/files/kv6.txt data/files/kv6.txt new file mode 100644 index 0000000..63de5c5 --- /dev/null +++ data/files/kv6.txt @@ -0,0 +1,100 @@ +00 +01 +02 +03 +04 +05 +06 +07 +08 +09 +010 +011 +012 +013 +014 +015 +016 +017 +018 +019 +020 +021 +022 +023 +024 +025 +026 +027 +028 +029 +030 +031 +032 +033 +034 +035 +036 +037 +038 +039 +040 +041 +042 +043 +044 +045 +046 +047 +048 +049 +10 +11 +12 +13 +14 +15 +16 +17 +18 +19 +110 +111 +112 +113 +114 +115 +116 +117 +118 +119 +120 +121 +122 +123 +124 +125 +126 +127 +128 +129 +130 +131 +132 +133 +134 +135 +136 +137 +138 +139 +140 +141 +142 +143 +144 +145 +146 +147 +148 +149 diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java index 69a69ff..aff26f0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java @@ -67,7 +67,6 @@ public abstract class AbstractMapJoinOperator extends Co }; transient boolean firstRow; - transient int heartbeatInterval; public AbstractMapJoinOperator() { } @@ -82,8 +81,6 @@ public abstract class AbstractMapJoinOperator extends Co numMapRowsRead = 0; firstRow = true; - heartbeatInterval = HiveConf.getIntVar(hconf, - HiveConf.ConfVars.HIVESENDHEARTBEAT); joinKeys = new HashMap>(); @@ -129,14 +126,6 @@ public abstract class AbstractMapJoinOperator extends Co + FATAL_ERR_MSG[(int) counterCode]); } - protected void reportProgress() { - // Send some status periodically - numMapRowsRead++; - if (((numMapRowsRead % heartbeatInterval) == 0) && (reporter != null)) { - reporter.progress(); - } - } - @Override public int getType() { return OperatorType.MAPJOIN; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java index 2b955d3..7ad4aac 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java @@ -24,8 +24,8 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Set; +import java.util.Map.Entry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -44,8 +44,8 @@ import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; import org.apache.hadoop.mapred.SequenceFileInputFormat; import org.apache.hadoop.util.ReflectionUtils; @@ -140,6 +140,9 @@ public abstract class CommonJoinOperator extends transient boolean handleSkewJoin = false; + protected transient int countAfterReport; + protected transient int heartbeatInterval; + public CommonJoinOperator() { } @@ -266,6 +269,11 @@ public abstract class CommonJoinOperator extends protected void initializeOp(Configuration hconf) throws HiveException { this.handleSkewJoin = conf.getHandleSkewJoin(); this.hconf = hconf; + + heartbeatInterval = HiveConf.getIntVar(hconf, + HiveConf.ConfVars.HIVESENDHEARTBEAT); + countAfterReport = 0; + totalSz = 0; // Map that contains the rows for each alias storage = new HashMap>>(); @@ -480,7 +488,9 @@ public abstract class CommonJoinOperator extends } } } + forward(forwardCache, outputObjInspector); + countAfterReport = 0; } private void copyOldArray(boolean[] src, boolean[] dest) { @@ -816,6 +826,7 @@ public abstract class CommonJoinOperator extends } forward(forwardCache, outputObjInspector); + countAfterReport = 0; return; } @@ -878,6 +889,17 @@ public abstract class CommonJoinOperator extends } } + protected void reportProgress() { + // Send some status periodically + countAfterReport++; + + if ((countAfterReport % heartbeatInterval) == 0 + && (reporter != null)) { + reporter.progress(); + countAfterReport = 0; + } + } + /** * All done. * diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java index 59788c2..6d57ee5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java @@ -151,6 +151,8 @@ public class GroupByOperator extends Operator implements transient int totalVariableSize; transient int numEntriesVarSize; transient int numEntriesHashTable; + transient int countAfterReport; + transient int heartbeatInterval; @Override protected void initializeOp(Configuration hconf) throws HiveException { @@ -158,6 +160,10 @@ public class GroupByOperator extends Operator implements numRowsInput = 0; numRowsHashTbl = 0; + heartbeatInterval = HiveConf.getIntVar(hconf, + HiveConf.ConfVars.HIVESENDHEARTBEAT); + countAfterReport = 0; + assert (inputObjInspectors.length == 1); ObjectInspector rowInspector = inputObjInspectors[0]; @@ -291,7 +297,7 @@ public class GroupByOperator extends Operator implements * the total amount of memory to be used by the map-side hash. By default, all * available memory is used. The size of each row is estimated, rather * crudely, and the number of entries are figure out based on that. - * + * * @return number of entries that can fit in hash table - useful for map-side * aggregation only **/ @@ -311,7 +317,7 @@ public class GroupByOperator extends Operator implements * datatype is of variable length, STRING, a list of such key positions is * maintained, and the size for such positions is then actually calculated at * runtime. - * + * * @param pos * the position of the key * @param c @@ -342,7 +348,7 @@ public class GroupByOperator extends Operator implements * field is of variable length, STRING, a list of such field names for the * field position is maintained, and the size for such positions is then * actually calculated at runtime. - * + * * @param pos * the position of the key * @param c @@ -454,15 +460,15 @@ public class GroupByOperator extends Operator implements * whether it has changed. As a cleanup, the lastInvoke logic can be pushed in * the caller, and this function can be independent of that. The client should * always notify whether it is a different row or not. - * + * * @param aggs the aggregations to be evaluated - * + * * @param row the row being processed - * + * * @param rowInspector the inspector for the row - * + * * @param hashAggr whether hash aggregation is being performed or not - * + * * @param newEntryForHashAggr only valid if it is a hash aggregation, whether * it is a new entry or not */ @@ -545,6 +551,8 @@ public class GroupByOperator extends Operator implements } try { + countAfterReport++; + // Compute the keys newKeys.clear(); for (int i = 0; i < keyFields.length; i++) { @@ -562,6 +570,12 @@ public class GroupByOperator extends Operator implements } firstRowInGroup = false; + + if (countAfterReport != 0 && (countAfterReport % heartbeatInterval) == 0 + && (reporter != null)) { + reporter.progress(); + countAfterReport = 0; + } } catch (HiveException e) { throw e; } catch (Exception e) { @@ -695,6 +709,7 @@ public class GroupByOperator extends Operator implements // Forward the current keys if needed for sort-based aggregation if (currentKeys != null && !keysAreEqual) { forward(currentKeys, aggregations); + countAfterReport = 0; } // Need to update the keys? @@ -724,7 +739,7 @@ public class GroupByOperator extends Operator implements /** * Based on user-parameters, should the hash table be flushed. - * + * * @param newKeys * keys for the row under consideration **/ @@ -809,6 +824,7 @@ public class GroupByOperator extends Operator implements forward(m.getKey().keys, m.getValue()); iter.remove(); numDel++; + countAfterReport = 0; if (numDel * 10 >= oldSize) { LOG.warn("Hash Table flushed: new size = " + hashAggregations.size()); return; @@ -820,7 +836,7 @@ public class GroupByOperator extends Operator implements /** * Forward a record of keys and aggregation results. - * + * * @param keys * The keys in the record * @throws HiveException @@ -843,7 +859,7 @@ public class GroupByOperator extends Operator implements /** * We need to forward all the aggregations to children. - * + * */ @Override public void closeOp(boolean abort) throws HiveException { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java index d89a7f8..877e811 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java @@ -68,6 +68,7 @@ public class JoinOperator extends CommonJoinOperator implements @Override public void processOp(Object row, int tag) throws HiveException { try { + reportProgress(); // get alias alias = (byte) tag; @@ -116,7 +117,6 @@ public class JoinOperator extends CommonJoinOperator implements // Add the value to the vector storage.get(alias).add(nr); - } catch (Exception e) { e.printStackTrace(); throw new HiveException(e); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java index 2ac6262..1cce302 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java @@ -160,11 +160,11 @@ public class MapJoinOperator extends AbstractMapJoinOperator implem @Override public void processOp(Object row, int tag) throws HiveException { - + if (tag == posBigTable) { this.getExecContext().processInputFileChangeForLocalWork(); } - + try { // get alias alias = (byte) tag; @@ -201,6 +201,7 @@ public class MapJoinOperator extends AbstractMapJoinOperator implem } reportProgress(); + numMapRowsRead++; if ((numMapRowsRead > maxMapJoinSize) && (reporter != null) && (counterNameToEnum != null)) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java index dc74532..10c8867 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java @@ -34,8 +34,8 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.FetchWork; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.MapredLocalWork; -import org.apache.hadoop.hive.ql.plan.MapredLocalWork.BucketMapJoinContext; import org.apache.hadoop.hive.ql.plan.SMBJoinDesc; +import org.apache.hadoop.hive.ql.plan.MapredLocalWork.BucketMapJoinContext; import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject; @@ -64,8 +64,8 @@ public class SMBMapJoinOperator extends AbstractMapJoinOperator imp HashMap>> candidateStorage; transient HashMap tagToAlias; - private transient HashMap fetchOpDone = new HashMap(); - private transient HashMap foundNextKeyGroup = new HashMap(); + private transient final HashMap fetchOpDone = new HashMap(); + private transient final HashMap foundNextKeyGroup = new HashMap(); transient boolean firstFetchHappened = false; transient boolean localWorkInited = false; @@ -125,8 +125,8 @@ public class SMBMapJoinOperator extends AbstractMapJoinOperator imp localWorkInited = true; this.localWork = localWork; fetchOperators = new HashMap(); - - Map fetchOpJobConfMap = new HashMap(); + + Map fetchOpJobConfMap = new HashMap(); // create map local operators for (Map.Entry entry : localWork.getAliasToFetchWork() .entrySet()) { @@ -137,7 +137,7 @@ public class SMBMapJoinOperator extends AbstractMapJoinOperator imp ArrayList list = ((TableScanOperator)tableScan).getNeededColumnIDs(); if (list != null) { ColumnProjectionUtils.appendReadColumnIDs(jobClone, list); - } + } } else { ColumnProjectionUtils.setFullyReadColumns(jobClone); } @@ -213,6 +213,7 @@ public class SMBMapJoinOperator extends AbstractMapJoinOperator imp } reportProgress(); + numMapRowsRead++; // the big table has reached a new key group. try to let the small tables // catch up with the big table. @@ -256,6 +257,7 @@ public class SMBMapJoinOperator extends AbstractMapJoinOperator imp break; } reportProgress(); + numMapRowsRead++; allFetchOpDone = allFetchOpDone(); } diff --git ql/src/test/queries/clientpositive/progress_1.q ql/src/test/queries/clientpositive/progress_1.q new file mode 100644 index 0000000..06051cb --- /dev/null +++ ql/src/test/queries/clientpositive/progress_1.q @@ -0,0 +1,9 @@ +set hive.heartbeat.interval=5; + +DROP TABLE PROGRESS_1; +CREATE TABLE PROGRESS_1(key int, value string) STORED AS TEXTFILE; +LOAD DATA LOCAL INPATH '../data/files/kv6.txt' INTO TABLE PROGRESS_1; + +select count(1) from PROGRESS_1 t1 join PROGRESS_1 t2 on t1.key=t2.key; + +DROP TABLE PROGRESS_1; diff --git ql/src/test/results/clientpositive/progress_1.q.out ql/src/test/results/clientpositive/progress_1.q.out new file mode 100644 index 0000000..b8cbf30 --- /dev/null +++ ql/src/test/results/clientpositive/progress_1.q.out @@ -0,0 +1,28 @@ +PREHOOK: query: DROP TABLE PROGRESS_1 +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE PROGRESS_1 +POSTHOOK: type: DROPTABLE +PREHOOK: query: CREATE TABLE PROGRESS_1(key int, value string) STORED AS TEXTFILE +PREHOOK: type: CREATETABLE +POSTHOOK: query: CREATE TABLE PROGRESS_1(key int, value string) STORED AS TEXTFILE +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@PROGRESS_1 +PREHOOK: query: LOAD DATA LOCAL INPATH '../data/files/kv6.txt' INTO TABLE PROGRESS_1 +PREHOOK: type: LOAD +POSTHOOK: query: LOAD DATA LOCAL INPATH '../data/files/kv6.txt' INTO TABLE PROGRESS_1 +POSTHOOK: type: LOAD +POSTHOOK: Output: default@progress_1 +PREHOOK: query: select count(1) from PROGRESS_1 t1 join PROGRESS_1 t2 on t1.key=t2.key +PREHOOK: type: QUERY +PREHOOK: Input: default@progress_1 +PREHOOK: Output: file:/tmp/sdong/hive_2010-07-08_16-37-28_683_2518541288555731352/10000 +POSTHOOK: query: select count(1) from PROGRESS_1 t1 join PROGRESS_1 t2 on t1.key=t2.key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@progress_1 +POSTHOOK: Output: file:/tmp/sdong/hive_2010-07-08_16-37-28_683_2518541288555731352/10000 +5000 +PREHOOK: query: DROP TABLE PROGRESS_1 +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE PROGRESS_1 +POSTHOOK: type: DROPTABLE +POSTHOOK: Output: default@progress_1 -- 1.6.0.4.609.g474fc