diff --git contrib/src/java/org/apache/hadoop/hive/contrib/fileformat/base64/Base64TextOutputFormat.java contrib/src/java/org/apache/hadoop/hive/contrib/fileformat/base64/Base64TextOutputFormat.java index 5d0cdd4..bc601c4 100644 --- contrib/src/java/org/apache/hadoop/hive/contrib/fileformat/base64/Base64TextOutputFormat.java +++ contrib/src/java/org/apache/hadoop/hive/contrib/fileformat/base64/Base64TextOutputFormat.java @@ -24,7 +24,7 @@ import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; +import org.apache.hadoop.hive.ql.io.FSRecordWriter; import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; @@ -53,13 +53,13 @@ * Base64RecordWriter. * */ - public static class Base64RecordWriter implements RecordWriter, + public static class Base64RecordWriter implements FSRecordWriter, JobConfigurable { - RecordWriter writer; + FSRecordWriter writer; BytesWritable bytesWritable; - public Base64RecordWriter(RecordWriter writer) { + public Base64RecordWriter(FSRecordWriter writer) { this.writer = writer; bytesWritable = new BytesWritable(); } @@ -119,7 +119,7 @@ public void configure(JobConf job) { } @Override - public RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath, + public FSRecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath, Class valueClass, boolean isCompressed, Properties tableProperties, Progressable progress) throws IOException { diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java index be1210e..6d383b5 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java @@ -34,7 +34,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; +import org.apache.hadoop.hive.ql.io.FSRecordWriter; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.Text; @@ -71,7 +71,7 @@ } @Override - public RecordWriter getHiveRecordWriter( + public FSRecordWriter getHiveRecordWriter( final JobConf jc, final Path finalOutPath, Class valueClass, @@ -120,7 +120,7 @@ public RecordWriter getHiveRecordWriter( ++i; } - return new RecordWriter() { + return new FSRecordWriter() { @Override public void close(boolean abort) throws IOException { diff --git hcatalog/core/src/test/java/org/apache/hcatalog/cli/DummyStorageHandler.java hcatalog/core/src/test/java/org/apache/hcatalog/cli/DummyStorageHandler.java index 1447a56..34df5ae 100644 --- hcatalog/core/src/test/java/org/apache/hcatalog/cli/DummyStorageHandler.java +++ hcatalog/core/src/test/java/org/apache/hcatalog/cli/DummyStorageHandler.java @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.HiveMetaHook; import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.ql.io.FSRecordWriter; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hadoop.hive.ql.metadata.AuthorizationException; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -285,7 +286,7 @@ public void checkOutputSpecs(FileSystem fs, JobConf jobconf) * org.apache.hadoop.util.Progressable) */ @Override - public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getHiveRecordWriter( + public FSRecordWriter getHiveRecordWriter( JobConf jc, Path finalOutPath, Class valueClass, boolean isCompressed, Properties tableProperties, Progressable progress) diff --git hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputFormat.java hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputFormat.java index 46eb157..94ba9b5 100644 --- hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputFormat.java +++ hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputFormat.java @@ -25,6 +25,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hive.ql.io.FSRecordWriter; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; @@ -40,7 +41,7 @@ HiveOutputFormat, Put> { @Override - public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getHiveRecordWriter( + public FSRecordWriter getHiveRecordWriter( JobConf jc, Path finalOutPath, Class valueClass, boolean isCompressed, Properties tableProperties, Progressable progress) diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index bcee201..8934804 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -35,11 +35,13 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.io.FSRecordWriter; +import org.apache.hadoop.hive.ql.io.FSRecordWriter.StatsProvidingRecordWriter; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; -import org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat; import org.apache.hadoop.hive.ql.io.HivePartitioner; +import org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -84,11 +86,13 @@ protected transient int dpStartCol; // start column # for DP columns protected transient List dpVals; // array of values corresponding to DP columns protected transient List dpWritables; - protected transient RecordWriter[] rowOutWriters; // row specific RecordWriters + protected transient FSRecordWriter[] rowOutWriters; // row specific RecordWriters protected transient int maxPartitions; protected transient ListBucketingCtx lbCtx; protected transient boolean isSkewedStoredAsSubDirectories; private transient boolean statsCollectRawDataSize; + private transient boolean[] statsFromRecordWriter; + private transient boolean isCollectRWStats; private static final transient String[] FATAL_ERR_MSG = { @@ -96,22 +100,12 @@ "Number of dynamic partitions exceeded hive.exec.max.dynamic.partitions.pernode." }; - /** - * RecordWriter. - * - */ - public static interface RecordWriter { - void write(Writable w) throws IOException; - - void close(boolean abort) throws IOException; - } - public class FSPaths implements Cloneable { Path tmpPath; Path taskOutputTempPath; Path[] outPaths; Path[] finalPaths; - RecordWriter[] outWriters; + FSRecordWriter[] outWriters; Stat stat; public FSPaths() { @@ -122,7 +116,7 @@ public FSPaths(Path specPath) { taskOutputTempPath = Utilities.toTaskTempPath(specPath); outPaths = new Path[numFiles]; finalPaths = new Path[numFiles]; - outWriters = new RecordWriter[numFiles]; + outWriters = new FSRecordWriter[numFiles]; stat = new Stat(); } @@ -166,11 +160,11 @@ public Path getFinalPath(String taskId, Path tmpPath, String extension) { } } - public void setOutWriters(RecordWriter[] out) { + public void setOutWriters(FSRecordWriter[] out) { outWriters = out; } - public RecordWriter[] getOutWriters() { + public FSRecordWriter[] getOutWriters() { return outWriters; } @@ -324,6 +318,7 @@ protected void initializeOp(Configuration hconf) throws HiveException { isCompressed = conf.getCompressed(); parent = Utilities.toTempPath(conf.getDirName()); statsCollectRawDataSize = conf.isStatsCollectRawDataSize(); + statsFromRecordWriter = new boolean[numFiles]; serializer = (Serializer) conf.getTableInfo().getDeserializerClass().newInstance(); serializer.initialize(null, conf.getTableInfo().getProperties()); @@ -516,6 +511,8 @@ private void createBucketFiles(FSPaths fsp) throws HiveException { fsp.outWriters[filesIdx] = HiveFileFormatUtils.getHiveRecordWriter( jc, conf.getTableInfo(), outputClass, conf, fsp.outPaths[filesIdx], reporter); + // If the record writer provides stats, get it from there instead of the serde + statsFromRecordWriter[filesIdx] = fsp.outWriters[filesIdx] instanceof StatsProvidingRecordWriter; // increment the CREATED_FILES counter if (reporter != null) { reporter.incrCounter(ProgressCounter.CREATED_FILES, 1); @@ -619,7 +616,11 @@ public void processOp(Object row, int tag) throws HiveException { } rowOutWriters = fpaths.outWriters; - if (conf.isGatherStats()) { + // check if all record writers implement statistics. if atleast one RW + // doesn't implement stats interface we will fallback to conventional way + // of gathering stats + isCollectRWStats = areAllTrue(statsFromRecordWriter); + if (conf.isGatherStats() && !isCollectRWStats) { if (statsCollectRawDataSize) { SerDeStats stats = serializer.getSerDeStats(); if (stats != null) { @@ -630,12 +631,14 @@ public void processOp(Object row, int tag) throws HiveException { } + FSRecordWriter rowOutWriter = null; + if (row_count != null) { row_count.set(row_count.get() + 1); } if (!multiFileSpray) { - rowOutWriters[0].write(recordValue); + rowOutWriter = rowOutWriters[0]; } else { int keyHashCode = 0; for (int i = 0; i < partitionEval.length; i++) { @@ -646,8 +649,9 @@ public void processOp(Object row, int tag) throws HiveException { key.setHashCode(keyHashCode); int bucketNum = prtner.getBucket(key, null, totalFiles); int idx = bucketMap.get(bucketNum); - rowOutWriters[idx].write(recordValue); + rowOutWriter = rowOutWriters[idx]; } + rowOutWriter.write(recordValue); } catch (IOException e) { throw new HiveException(e); } catch (SerDeException e) { @@ -655,6 +659,15 @@ public void processOp(Object row, int tag) throws HiveException { } } + private boolean areAllTrue(boolean[] statsFromRW) { + for(boolean b : statsFromRW) { + if (!b) { + return false; + } + } + return true; + } + /** * Lookup list bucketing path. * @param lbDirName @@ -864,6 +877,27 @@ public void closeOp(boolean abort) throws HiveException { if (!abort) { for (FSPaths fsp : valToPaths.values()) { fsp.closeWriters(abort); + + // before closing the operator check if statistics gathering is requested + // and is provided by record writer. this is different from the statistics + // gathering done in processOp(). In processOp(), for each row added + // serde statistics about the row is gathered and accumulated in hashmap. + // this adds more overhead to the actual processing of row. But if the + // record writer already gathers the statistics, it can simply return the + // accumulated statistics which will be aggregated in case of spray writers + if (conf.isGatherStats() && isCollectRWStats) { + for (int idx = 0; idx < fsp.outWriters.length; idx++) { + FSRecordWriter outWriter = fsp.outWriters[idx]; + if (outWriter != null) { + SerDeStats stats = ((StatsProvidingRecordWriter) outWriter).getStats(); + if (stats != null) { + fsp.stat.addToStat(StatsSetupConst.RAW_DATA_SIZE, stats.getRawDataSize()); + fsp.stat.addToStat(StatsSetupConst.ROW_COUNT, stats.getRowCount()); + } + } + } + } + if (isNativeTable) { fsp.commit(fs); } @@ -934,7 +968,7 @@ public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException hiveOutputFormat = ReflectionUtils.newInstance(conf.getTableInfo().getOutputFileFormatClass(),job); } else { - hiveOutputFormat = conf.getTableInfo().getOutputFileFormatClass().newInstance(); + hiveOutputFormat = conf.getTableInfo().getOutputFileFormatClass().newInstance(); } } else { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index a68e13a..4bfc55e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -102,12 +102,12 @@ import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryPlan; -import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; import org.apache.hadoop.hive.ql.exec.mr.ExecDriver; import org.apache.hadoop.hive.ql.exec.mr.ExecMapper; import org.apache.hadoop.hive.ql.exec.mr.ExecReducer; import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; import org.apache.hadoop.hive.ql.io.ContentSummaryInputFormat; +import org.apache.hadoop.hive.ql.io.FSRecordWriter; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat; import org.apache.hadoop.hive.ql.io.HiveInputFormat; @@ -1694,7 +1694,7 @@ private static void createEmptyBuckets(Configuration hconf, ArrayList pa for (String p : paths) { Path path = new Path(p); - RecordWriter writer = HiveFileFormatUtils.getRecordWriter( + FSRecordWriter writer = HiveFileFormatUtils.getRecordWriter( jc, hiveOutputFormat, outputClass, isCompressed, tableInfo.getProperties(), path, reporter); writer.close(false); @@ -2853,7 +2853,7 @@ private static Path createEmptyFile(String hiveScratchDir, Path newFilePath = new Path(newFile); String onefile = newPath.toString(); - RecordWriter recWriter = outFileFormat.newInstance().getHiveRecordWriter(job, newFilePath, + FSRecordWriter recWriter = outFileFormat.newInstance().getHiveRecordWriter(job, newFilePath, Text.class, false, props, null); if (dummyRow) { // empty files are omitted at CombineHiveInputFormat. diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java index 19538bf..9b2babc 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java @@ -28,8 +28,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.io.FSRecordWriter; import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.PTFDeserializer; @@ -240,7 +240,7 @@ public static TableDesc createTableDesc(StructObjectInspector oI) { } - private static class PTFRecordWriter implements RecordWriter { + private static class PTFRecordWriter implements FSRecordWriter { BytesWritable EMPTY_KEY = new BytesWritable(); SequenceFile.Writer outStream; @@ -262,7 +262,7 @@ public void close(boolean abort) throws IOException { extends HiveSequenceFileOutputFormat { @Override - public RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath, + public FSRecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath, Class valueClass, boolean isCompressed, Properties tableProperties, Progressable progress) throws IOException { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java index d97beb5..3b0bc2a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java @@ -30,8 +30,8 @@ import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.io.FSRecordWriter; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -105,7 +105,7 @@ int acutalSplitNum = 0; int currentSplitPointer = 0; org.apache.hadoop.mapred.RecordReader rr = null; // record reader - RecordWriter rw = null; + FSRecordWriter rw = null; InputFormat inputFormat = null; InputSplit[] inputSplits = null; private ROW dummyRow = null; @@ -531,7 +531,7 @@ protected void setupWriter() throws HiveException { } - protected RecordWriter getRecordWriter() { + protected FSRecordWriter getRecordWriter() { return rw; } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/FSRecordWriter.java ql/src/java/org/apache/hadoop/hive/ql/io/FSRecordWriter.java new file mode 100644 index 0000000..83ac010 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/FSRecordWriter.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io; + +import java.io.IOException; + +import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.io.Writable; + +/** + * Record writer used by file sink operator. + * + * FSRecordWriter. + * + */ +public interface FSRecordWriter { + void write(Writable w) throws IOException; + + void close(boolean abort) throws IOException; + + /** + * If a file format internally gathers statistics (like ORC) while writing then + * it can expose the statistics through this record writer interface. Writer side + * statistics is useful for updating the metastore with table/partition level + * statistics. + * StatsProvidingRecordWriter. + * + */ + public interface StatsProvidingRecordWriter extends FSRecordWriter{ + /** + * Returns the statistics information + * @return SerDeStats + */ + SerDeStats getStats(); + } + +} \ No newline at end of file diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveBinaryOutputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveBinaryOutputFormat.java index e1454c1..6768292 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveBinaryOutputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveBinaryOutputFormat.java @@ -24,7 +24,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; @@ -43,7 +42,7 @@ /** * create the final out file, and output row by row. After one row is * appended, a configured row separator is appended - * + * * @param jc * the job configuration file * @param outPath @@ -59,14 +58,14 @@ * @return the RecordWriter */ @Override - public RecordWriter getHiveRecordWriter(JobConf jc, Path outPath, + public FSRecordWriter getHiveRecordWriter(JobConf jc, Path outPath, Class valueClass, boolean isCompressed, Properties tableProperties, Progressable progress) throws IOException { FileSystem fs = outPath.getFileSystem(jc); final OutputStream outStream = fs.create(outPath); - return new RecordWriter() { + return new FSRecordWriter() { public void write(Writable r) throws IOException { if (r instanceof Text) { Text tr = (Text) r; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java index 3719326..4be56f3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java @@ -32,7 +32,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat; @@ -246,7 +245,7 @@ private static boolean checkTextInputFormat(FileSystem fs, HiveConf conf, return true; } - public static RecordWriter getHiveRecordWriter(JobConf jc, + public static FSRecordWriter getHiveRecordWriter(JobConf jc, TableDesc tableInfo, Class outputClass, FileSinkDesc conf, Path outPath, Reporter reporter) throws HiveException { boolean storagehandlerofhivepassthru = false; @@ -287,7 +286,7 @@ public static RecordWriter getHiveRecordWriter(JobConf jc, } } - public static RecordWriter getRecordWriter(JobConf jc, + public static FSRecordWriter getRecordWriter(JobConf jc, HiveOutputFormat hiveOutputFormat, final Class valueClass, boolean isCompressed, Properties tableProp, Path outPath, Reporter reporter diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveIgnoreKeyTextOutputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveIgnoreKeyTextOutputFormat.java index e9add62..ad6e4ba 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveIgnoreKeyTextOutputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveIgnoreKeyTextOutputFormat.java @@ -25,7 +25,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; @@ -39,7 +38,7 @@ /** * HiveIgnoreKeyTextOutputFormat replaces key with null before feeding the to TextOutputFormat.RecordWriter. - * + * */ public class HiveIgnoreKeyTextOutputFormat extends TextOutputFormat implements HiveOutputFormat { @@ -47,7 +46,7 @@ /** * create the final out file, and output row by row. After one row is * appended, a configured row separator is appended - * + * * @param jc * the job configuration file * @param outPath @@ -63,7 +62,7 @@ * @return the RecordWriter */ @Override - public RecordWriter getHiveRecordWriter(JobConf jc, Path outPath, + public FSRecordWriter getHiveRecordWriter(JobConf jc, Path outPath, Class valueClass, boolean isCompressed, Properties tableProperties, Progressable progress) throws IOException { int rowSeparator = 0; @@ -79,7 +78,7 @@ public RecordWriter getHiveRecordWriter(JobConf jc, Path outPath, FileSystem fs = outPath.getFileSystem(jc); final OutputStream outStream = Utilities.createCompressedStream(jc, fs .create(outPath), isCompressed); - return new RecordWriter() { + return new FSRecordWriter() { public void write(Writable r) throws IOException { if (r instanceof Text) { Text tr = (Text) r; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveNullValueSequenceFileOutputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveNullValueSequenceFileOutputFormat.java index 856c85e..ef6a982 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveNullValueSequenceFileOutputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveNullValueSequenceFileOutputFormat.java @@ -23,7 +23,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.NullWritable; @@ -48,7 +47,7 @@ private boolean keyIsText; @Override - public RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath, + public FSRecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath, Class valueClass, boolean isCompressed, Properties tableProperties, Progressable progress) throws IOException { @@ -58,7 +57,7 @@ public RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath, keyWritable = new HiveKey(); keyIsText = valueClass.equals(Text.class); - return new RecordWriter() { + return new FSRecordWriter() { public void write(Writable r) throws IOException { if (keyIsText) { Text text = (Text) r; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveOutputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveOutputFormat.java index 0e80b83..e5ac805 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveOutputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveOutputFormat.java @@ -22,7 +22,6 @@ import java.util.Properties; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputFormat; @@ -58,7 +57,7 @@ * progress used for status report * @return the RecordWriter for the output file */ - RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath, + FSRecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath, final Class valueClass, boolean isCompressed, Properties tableProperties, Progressable progress) throws IOException; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughOutputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughOutputFormat.java index c113f54..1fb5898 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughOutputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughOutputFormat.java @@ -49,7 +49,7 @@ "org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat"; public static final String HIVE_PASSTHROUGH_STORAGEHANDLER_OF_JOBCONFKEY = - "hive.passthrough.storagehandler.of"; + "hive.passthrough.storagehandler.of"; public HivePassThroughOutputFormat() { //construct this class through ReflectionUtils from FileSinkOperator @@ -99,7 +99,7 @@ public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException } @Override - public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getHiveRecordWriter( + public FSRecordWriter getHiveRecordWriter( JobConf jc, Path finalOutPath, Class valueClass, boolean isCompressed, Properties tableProperties, Progressable progress) throws IOException { if (this.initialized == false) { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughRecordWriter.java ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughRecordWriter.java index 90f592b..2186944 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughRecordWriter.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughRecordWriter.java @@ -20,13 +20,12 @@ import java.io.IOException; -import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; public class HivePassThroughRecordWriter , V extends Writable> -implements RecordWriter { +implements FSRecordWriter { private final org.apache.hadoop.mapred.RecordWriter mWriter; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveSequenceFileOutputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveSequenceFileOutputFormat.java index 80f93f9..0cf00e9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveSequenceFileOutputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveSequenceFileOutputFormat.java @@ -23,7 +23,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.SequenceFile; @@ -56,7 +55,7 @@ * @return the RecordWriter for the output file */ @Override - public RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath, + public FSRecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath, Class valueClass, boolean isCompressed, Properties tableProperties, Progressable progress) throws IOException { @@ -64,7 +63,7 @@ public RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath, final SequenceFile.Writer outStream = Utilities.createSequenceWriter(jc, fs, finalOutPath, BytesWritable.class, valueClass, isCompressed); - return new RecordWriter() { + return new FSRecordWriter() { public void write(Writable r) throws IOException { outStream.append(EMPTY_KEY, r); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java index ddf2321..e21f9fe 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java @@ -118,7 +118,7 @@ public void write(WritableComparable key, BytesRefArrayWritable value) * @throws IOException */ @Override - public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getHiveRecordWriter( + public FSRecordWriter getHiveRecordWriter( JobConf jc, Path finalOutPath, Class valueClass, boolean isCompressed, Properties tableProperties, Progressable progress) throws IOException { @@ -135,7 +135,7 @@ public void write(WritableComparable key, BytesRefArrayWritable value) (jc, finalOutPath.getFileSystem(jc), finalOutPath, isCompressed); - return new org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter() { + return new FSRecordWriter() { public void write(Writable r) throws IOException { outWriter.append(r); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java index 9d6c7ec..8d75b44 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java @@ -17,6 +17,14 @@ */ package org.apache.hadoop.hive.ql.io.avro; +import static org.apache.avro.file.DataFileConstants.DEFLATE_CODEC; +import static org.apache.avro.mapred.AvroJob.OUTPUT_CODEC; +import static org.apache.avro.mapred.AvroOutputFormat.DEFAULT_DEFLATE_LEVEL; +import static org.apache.avro.mapred.AvroOutputFormat.DEFLATE_LEVEL_KEY; + +import java.io.IOException; +import java.util.Properties; + import org.apache.avro.Schema; import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileWriter; @@ -24,7 +32,7 @@ import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.io.FSRecordWriter; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable; import org.apache.hadoop.hive.serde2.avro.AvroSerdeException; @@ -36,14 +44,6 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.util.Progressable; -import java.io.IOException; -import java.util.Properties; - -import static org.apache.avro.file.DataFileConstants.DEFLATE_CODEC; -import static org.apache.avro.mapred.AvroJob.OUTPUT_CODEC; -import static org.apache.avro.mapred.AvroOutputFormat.DEFAULT_DEFLATE_LEVEL; -import static org.apache.avro.mapred.AvroOutputFormat.DEFLATE_LEVEL_KEY; - /** * Write to an Avro file from a Hive process. */ @@ -51,7 +51,7 @@ implements HiveOutputFormat { @Override - public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jobConf, + public FSRecordWriter getHiveRecordWriter(JobConf jobConf, Path path, Class valueClass, boolean isCompressed, Properties properties, Progressable progressable) throws IOException { Schema schema; @@ -62,7 +62,7 @@ } GenericDatumWriter gdw = new GenericDatumWriter(schema); DataFileWriter dfw = new DataFileWriter(gdw); - + if (isCompressed) { int level = jobConf.getInt(DEFLATE_LEVEL_KEY, DEFAULT_DEFLATE_LEVEL); String codecName = jobConf.get(OUTPUT_CODEC, DEFLATE_CODEC); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordWriter.java ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordWriter.java index 2fe0a72..73e1cdd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordWriter.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordWriter.java @@ -18,18 +18,18 @@ package org.apache.hadoop.hive.ql.io.avro; +import java.io.IOException; + import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.io.FSRecordWriter; import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable; import org.apache.hadoop.io.Writable; -import java.io.IOException; - /** * Write an Avro GenericRecord to an Avro data file. */ -public class AvroGenericRecordWriter implements FileSinkOperator.RecordWriter{ +public class AvroGenericRecordWriter implements FSRecordWriter{ final private DataFileWriter dfw; public AvroGenericRecordWriter(DataFileWriter dfw) { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java index c80fb02..6f8ca73 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java @@ -17,10 +17,9 @@ */ package org.apache.hadoop.hive.ql.io.orc; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.io.FSRecordWriter; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hadoop.hive.ql.io.orc.OrcSerde.OrcSerdeRow; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -45,7 +44,7 @@ private static class OrcRecordWriter implements RecordWriter, - FileSinkOperator.RecordWriter { + FSRecordWriter { private Writer writer = null; private final Path path; private final OrcFile.WriterOptions options; @@ -105,7 +104,7 @@ public void close(boolean b) throws IOException { } @Override - public FileSinkOperator.RecordWriter + public FSRecordWriter getHiveRecordWriter(JobConf conf, Path path, Class valueClass, diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java index 90260fd..37d7d86 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java @@ -39,6 +39,19 @@ long getNumberOfRows(); /** + * Get the deserialized data size of the file + * @return raw data size + */ + long getRawDataSize(); + + /** + * Get the deserialized data size of the specified columns + * @param colNames + * @return raw data size of columns + */ + long getRawDataSizeOfColumns(List colNames); + + /** * Get the user metadata keys. * @return the set of metadata keys */ diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java index c454f32..e034ca0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java @@ -343,4 +343,14 @@ public RecordReader rows(long offset, long length, boolean[] include, include, footer.getRowIndexStride(), sarg, columnNames); } + @Override + public long getRawDataSize() { + return 0; + } + + @Override + public long getRawDataSizeOfColumns(List colNames) { + return 0; + } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java index 8e74b91..591a238 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java @@ -47,4 +47,22 @@ * @throws IOException */ void close() throws IOException; + + /** + * Return the deserialized data size. Raw data size will be compute when + * writing the file footer. Hence raw data size value will be available only + * after closing the writer. + * + * @return raw data size + */ + long getRawDataSize(); + + /** + * Return the number of rows in file. Row count gets updated when flushing + * the stripes. To get accurate row count this method should be called after + * closing the writer. + * + * @return row count + */ + long getNumberOfRows(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java index 44961ce..c0b55ce 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java @@ -1871,4 +1871,14 @@ public void close() throws IOException { rawWriter.close(); } } + + @Override + public long getRawDataSize() { + return 0; + } + + @Override + public long getNumberOfRows() { + return 0; + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index 34b2305..1f7ed42 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -41,7 +41,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.io.FSRecordWriter; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hadoop.hive.ql.io.InputFormatChecker; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; @@ -521,7 +521,7 @@ public void testInOutFormat() throws Exception { } SerDe serde = new OrcSerde(); HiveOutputFormat outFormat = new OrcOutputFormat(); - FileSinkOperator.RecordWriter writer = + FSRecordWriter writer = outFormat.getHiveRecordWriter(conf, testFilePath, MyRow.class, true, properties, Reporter.NULL); writer.write(serde.serialize(new MyRow(1,2), inspector)); @@ -686,7 +686,7 @@ public void testEmptyFile() throws Exception { JobConf job = new JobConf(conf); Properties properties = new Properties(); HiveOutputFormat outFormat = new OrcOutputFormat(); - FileSinkOperator.RecordWriter writer = + FSRecordWriter writer = outFormat.getHiveRecordWriter(conf, testFilePath, MyRow.class, true, properties, Reporter.NULL); writer.close(true); @@ -731,7 +731,7 @@ public void testDefaultTypes() throws Exception { } SerDe serde = new OrcSerde(); HiveOutputFormat outFormat = new OrcOutputFormat(); - FileSinkOperator.RecordWriter writer = + FSRecordWriter writer = outFormat.getHiveRecordWriter(conf, testFilePath, StringRow.class, true, properties, Reporter.NULL); writer.write(serde.serialize(new StringRow("owen"), inspector)); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/udf/Rot13OutputFormat.java ql/src/test/org/apache/hadoop/hive/ql/io/udf/Rot13OutputFormat.java index 5beea27..77bc218 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/udf/Rot13OutputFormat.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/udf/Rot13OutputFormat.java @@ -1,7 +1,10 @@ package org.apache.hadoop.hive.ql.io.udf; +import java.io.IOException; +import java.util.Properties; + import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; +import org.apache.hadoop.hive.ql.io.FSRecordWriter; import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; @@ -11,27 +14,24 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.util.Progressable; -import java.io.IOException; -import java.util.Properties; - public class Rot13OutputFormat extends HiveIgnoreKeyTextOutputFormat { @Override - public RecordWriter + public FSRecordWriter getHiveRecordWriter(JobConf jc, Path outPath, Class valueClass, boolean isCompressed, Properties tableProperties, Progressable progress) throws IOException { - final RecordWriter result = + final FSRecordWriter result = super.getHiveRecordWriter(jc,outPath,valueClass,isCompressed, tableProperties,progress); final Reporter reporter = (Reporter) progress; reporter.setStatus("got here"); System.out.println("Got a reporter " + reporter); - return new RecordWriter() { + return new FSRecordWriter() { @Override public void write(Writable w) throws IOException { if (w instanceof Text) { diff --git serde/src/java/org/apache/hadoop/hive/serde2/SerDeStats.java serde/src/java/org/apache/hadoop/hive/serde2/SerDeStats.java index 1c09dc3..6cf2ccd 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/SerDeStats.java +++ serde/src/java/org/apache/hadoop/hive/serde2/SerDeStats.java @@ -27,9 +27,11 @@ // currently we support only raw data size stat private long rawDataSize; + private long rowCount; public SerDeStats() { rawDataSize = 0; + rowCount = 0; } /** @@ -48,4 +50,20 @@ public void setRawDataSize(long uSize) { rawDataSize = uSize; } + /** + * Return the row count + * @return row count + */ + public long getRowCount() { + return rowCount; + } + + /** + * Set the row count + * @param rowCount - count of rows + */ + public void setRowCount(long rowCount) { + this.rowCount = rowCount; + } + }