diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index bcee201..7f4b816 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -38,8 +38,9 @@ import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; -import org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat; import org.apache.hadoop.hive.ql.io.HivePartitioner; +import org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat; +import org.apache.hadoop.hive.ql.io.StatsProvidingRecordWriter; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -89,6 +90,7 @@ protected transient ListBucketingCtx lbCtx; protected transient boolean isSkewedStoredAsSubDirectories; private transient boolean statsCollectRawDataSize; + private transient boolean statsFromRecordWriter = false; private static final transient String[] FATAL_ERR_MSG = { @@ -516,6 +518,8 @@ private void createBucketFiles(FSPaths fsp) throws HiveException { fsp.outWriters[filesIdx] = HiveFileFormatUtils.getHiveRecordWriter( jc, conf.getTableInfo(), outputClass, conf, fsp.outPaths[filesIdx], reporter); + // If the record writer provides stats, get it from there instead of the serde + statsFromRecordWriter = fsp.outWriters[filesIdx] instanceof StatsProvidingRecordWriter; // increment the CREATED_FILES counter if (reporter != null) { reporter.incrCounter(ProgressCounter.CREATED_FILES, 1); @@ -619,7 +623,7 @@ public void processOp(Object row, int tag) throws HiveException { } rowOutWriters = fpaths.outWriters; - if (conf.isGatherStats()) { + if (conf.isGatherStats() && !statsFromRecordWriter) { if (statsCollectRawDataSize) { SerDeStats stats = serializer.getSerDeStats(); if (stats != null) { @@ -630,12 +634,14 @@ public void processOp(Object row, int tag) throws HiveException { } + RecordWriter rowOutWriter = null; + if (row_count != null) { row_count.set(row_count.get() + 1); } if (!multiFileSpray) { - rowOutWriters[0].write(recordValue); + rowOutWriter = rowOutWriters[0]; } else { int keyHashCode = 0; for (int i = 0; i < partitionEval.length; i++) { @@ -646,8 +652,9 @@ public void processOp(Object row, int tag) throws HiveException { key.setHashCode(keyHashCode); int bucketNum = prtner.getBucket(key, null, totalFiles); int idx = bucketMap.get(bucketNum); - rowOutWriters[idx].write(recordValue); + rowOutWriter = rowOutWriters[idx]; } + rowOutWriter.write(recordValue); } catch (IOException e) { throw new HiveException(e); } catch (SerDeException e) { @@ -864,6 +871,27 @@ public void closeOp(boolean abort) throws HiveException { if (!abort) { for (FSPaths fsp : valToPaths.values()) { fsp.closeWriters(abort); + + // before closing the operator check if statistics gathering is requested + // and is provided by record writer. this is different from the statistics + // gathering done in processOp(). In processOp(), for each row added + // serde statistics about the row is gathered and accumulated in hashmap. + // this adds more overhead to the actual processing of row. But if the + // record writer already gathers the statistics, it can simply return the + // accumulated statistics which will be aggregated in case of spray writers + if (conf.isGatherStats() && statsFromRecordWriter) { + for (int idx = 0; idx < fsp.outWriters.length; idx++) { + RecordWriter outWriter = fsp.outWriters[idx]; + if (outWriter != null) { + SerDeStats stats = ((StatsProvidingRecordWriter) outWriter).getStats(); + if (stats != null) { + fsp.stat.addToStat(StatsSetupConst.RAW_DATA_SIZE, stats.getRawDataSize()); + fsp.stat.addToStat(StatsSetupConst.ROW_COUNT, stats.getRowCount()); + } + } + } + } + if (isNativeTable) { fsp.commit(fs); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/StatsProvidingRecordWriter.java ql/src/java/org/apache/hadoop/hive/ql/io/StatsProvidingRecordWriter.java new file mode 100644 index 0000000..536dc94 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/StatsProvidingRecordWriter.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io; + +import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; +import org.apache.hadoop.hive.serde2.SerDeStats; + +/** + * If a file format internally gathers statistics (like ORC) while writing then + * it can expose the statistics through this record writer interface. Writer side + * statistics is useful for updating the metastore with table/partition level + * statistics. + * StatsProvidingRecordWriter. + * + */ +public interface StatsProvidingRecordWriter extends RecordWriter{ + SerDeStats getStats(); +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java index 90260fd..37d7d86 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java @@ -39,6 +39,19 @@ long getNumberOfRows(); /** + * Get the deserialized data size of the file + * @return raw data size + */ + long getRawDataSize(); + + /** + * Get the deserialized data size of the specified columns + * @param colNames + * @return raw data size of columns + */ + long getRawDataSizeOfColumns(List colNames); + + /** * Get the user metadata keys. * @return the set of metadata keys */ diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java index 8e74b91..591a238 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java @@ -47,4 +47,22 @@ * @throws IOException */ void close() throws IOException; + + /** + * Return the deserialized data size. Raw data size will be compute when + * writing the file footer. Hence raw data size value will be available only + * after closing the writer. + * + * @return raw data size + */ + long getRawDataSize(); + + /** + * Return the number of rows in file. Row count gets updated when flushing + * the stripes. To get accurate row count this method should be called after + * closing the writer. + * + * @return row count + */ + long getNumberOfRows(); } diff --git serde/src/java/org/apache/hadoop/hive/serde2/SerDeStats.java serde/src/java/org/apache/hadoop/hive/serde2/SerDeStats.java index 1c09dc3..6cf2ccd 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/SerDeStats.java +++ serde/src/java/org/apache/hadoop/hive/serde2/SerDeStats.java @@ -27,9 +27,11 @@ // currently we support only raw data size stat private long rawDataSize; + private long rowCount; public SerDeStats() { rawDataSize = 0; + rowCount = 0; } /** @@ -48,4 +50,20 @@ public void setRawDataSize(long uSize) { rawDataSize = uSize; } + /** + * Return the row count + * @return row count + */ + public long getRowCount() { + return rowCount; + } + + /** + * Set the row count + * @param rowCount - count of rows + */ + public void setRowCount(long rowCount) { + this.rowCount = rowCount; + } + }