diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index cf82e8b..1631e2d 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2041,7 +2041,9 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { SPARK_RPC_SASL_MECHANISM("hive.spark.client.rpc.sasl.mechanisms", "DIGEST-MD5", "Name of the SASL mechanism to use for authentication."), NWAYJOINREORDER("hive.reorder.nway.joins", true, - "Runs reordering of tables within single n-way join (i.e.: picks streamtable)"); + "Runs reordering of tables within single n-way join (i.e.: picks streamtable)"), + HIVE_LOG_N_RECORDS("hive.log.every.n.records", 0L, new RangeValidator(0L, null), + "If value is greater than 0 logs in fixed intervals of size n rather than exponentially."); public final String varname; private final String defaultExpr; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 8b2749c..1826266 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -118,6 +118,7 @@ private IntObjectInspector bucketInspector; // OI for inspecting bucket id protected transient long numRows = 0; protected transient long cntr = 1; + protected transient long logEveryNRows = 0; /** * Counters. @@ -420,6 +421,8 @@ private void initializeSpecPath() { } numRows = 0; + cntr = 1; + logEveryNRows = HiveConf.getLongVar(hconf, HiveConf.ConfVars.HIVE_LOG_N_RECORDS); String suffix = Integer.toString(conf.getDestTableId()); String fullName = conf.getTableInfo().getTableName(); @@ -705,7 +708,11 @@ public void process(Object row, int tag) throws HiveException { } if ((++numRows == cntr) && isLogInfoEnabled) { - cntr *= 10; + cntr = logEveryNRows == 0 ? cntr * 10 : numRows + logEveryNRows; + if (cntr < 0 || numRows < 0) { + cntr = 0; + numRows = 1; + } LOG.info(toString() + ": records written - " + numRows); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java index dc7d821..66e3a77 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java @@ -34,6 +34,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor; @@ -88,6 +89,7 @@ private final transient LongWritable recordCounter = new LongWritable(); protected transient long numRows = 0; protected transient long cntr = 1; + protected transient long logEveryNRows = 0; // input path --> {operator --> context} private final Map, MapOpCtx>> opCtxMap = @@ -426,6 +428,8 @@ public void initializeMapOperator(Configuration hconf) throws HiveException { statsMap.put(Counter.DESERIALIZE_ERRORS.toString(), deserialize_error_count); numRows = 0; + cntr = 1; + logEveryNRows = HiveConf.getLongVar(hconf, HiveConf.ConfVars.HIVE_LOG_N_RECORDS); String context = hconf.get(Operator.CONTEXT_NAME_KEY, ""); if (context != null && !context.isEmpty()) { @@ -521,7 +525,11 @@ protected final void rowsForwarded(int childrenDone, int rows) { numRows += rows; if (isLogInfoEnabled) { while (numRows >= cntr) { - cntr *= 10; + cntr = logEveryNRows == 0 ? cntr * 10 : numRows + logEveryNRows; + if (cntr < 0 || numRows < 0) { + cntr = 1; + numRows = 0; + } LOG.info(toString() + ": records read - " + numRows); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java index ee86c2c..468d87f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java @@ -152,6 +152,7 @@ protected transient long numRows = 0; protected transient long cntr = 1; + protected transient long logEveryNRows = 0; private final transient LongWritable recordCounter = new LongWritable(); @Override @@ -160,6 +161,8 @@ try { numRows = 0; + cntr = 1; + logEveryNRows = HiveConf.getLongVar(hconf, HiveConf.ConfVars.HIVE_LOG_N_RECORDS); String context = hconf.get(Operator.CONTEXT_NAME_KEY, ""); if (context != null && !context.isEmpty()) { @@ -531,7 +534,11 @@ protected void collect(BytesWritable keyWritable, Writable valueWritable) throws numRows++; if (isLogInfoEnabled) { if (numRows == cntr) { - cntr *= 10; + cntr = logEveryNRows == 0 ? cntr * 10 : numRows + logEveryNRows; + if (cntr < 0 || numRows < 0) { + cntr = 0; + numRows = 1; + } LOG.info(toString() + ": records written - " + numRows); } }