diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java index 3762ee5..94b3a06 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java @@ -45,6 +45,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; + /** * Join operator implementation. */ @@ -145,6 +147,7 @@ int joinCacheSize = 0; long nextSz = 0; transient Byte lastAlias = null; + private long logEveryNRows = 0L; transient boolean handleSkewJoin = false; @@ -170,6 +173,7 @@ public CommonJoinOperator(CommonJoinOperator clone) { this.joinEmitInterval = clone.joinEmitInterval; this.joinCacheSize = clone.joinCacheSize; this.nextSz = clone.nextSz; + this.logEveryNRows = clone.logEveryNRows; this.childOperators = clone.childOperators; this.parentOperators = clone.parentOperators; this.done = clone.done; @@ -294,6 +298,9 @@ protected void initializeOp(Configuration hconf) throws HiveException { joinCacheSize = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEJOINCACHESIZE); + logEveryNRows = HiveConf.getIntVar(hconf, + HiveConf.ConfVars.HIVE_LOG_N_RECORDS); + // construct dummy null row (indicating empty table) and // construct spill table serde which is used if input is too // large to fit into main memory. @@ -394,13 +401,22 @@ public void startGroup() throws HiveException { super.startGroup(); } + /** + * Determine the frequency with which to emit a log message instead of + * one for every for every event. + * + * @param sz The current number of events + * @return The next event count to emit a log message + */ protected long getNextSize(long sz) { - // A very simple counter to keep track of join entries for a key - if (sz >= 100000) { - return sz + 100000; + Preconditions.checkArgument(sz >= 0L); + // If no logging is configured, log every 1, 10, 100, 1000, ..., 100000 + if (this.logEveryNRows == 0L) { + final long next = (long) Math.pow(10.0, Math.ceil(Math.log10(sz + 1))); + return Math.min(100000L, next); } - - return 2 * sz; + // Log every N rows + return ((sz / this.logEveryNRows) + 1L) * this.logEveryNRows; } protected transient Byte alias; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java index e995ab7..451ba1f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java @@ -82,10 +82,6 @@ public void process(Object row, int tag) throws HiveException { lastAlias = alias; alias = (byte) tag; - if (!alias.equals(lastAlias)) { - nextSz = joinEmitInterval; - } - List nr = getFilteredValue(alias, row); if (handleSkewJoin) { @@ -93,7 +89,7 @@ public void process(Object row, int tag) throws HiveException { } // number of rows for the key in the given table - long sz = storage[alias].rowCount(); + final long sz = storage[alias].rowCount(); StructObjectInspector soi = (StructObjectInspector) inputObjInspectors[tag]; StructField sf = soi.getStructFieldRef(Utilities.ReduceField.KEY .toString()); @@ -112,14 +108,16 @@ public void process(Object row, int tag) throws HiveException { checkAndGenObject(); storage[alias].clearRows(); } - } else { - if (LOG.isInfoEnabled() && (sz == nextSz)) { - // Print a message if we reached at least 1000 rows for a join operand - // We won't print a message for the last join operand since the size - // will never goes to joinEmitInterval. - LOG.info("table " + alias + " has " + sz + " rows for join key " + keyObject); - nextSz = getNextSize(nextSz); - } + } + + // The input is sorted by alias, so when an alias change is detected, + // reset the counter for the next join key in the stream + if (!alias.equals(lastAlias)) { + nextSz = getNextSize(0L); + } + if (sz == nextSz) { + LOG.info("Table {} has {} rows for join key {}", alias, sz, keyObject); + nextSz = getNextSize(nextSz); } // Add the value to the vector