commit 14e1ff3f73f3b0547e5b5650fbe6b56475731797 Author: Ashutosh Chauhan Date: Tue Apr 22 17:20:41 2014 -0700 Make Hive output format backward compatible diff --git a/contrib/src/java/org/apache/hadoop/hive/contrib/fileformat/base64/Base64TextOutputFormat.java b/contrib/src/java/org/apache/hadoop/hive/contrib/fileformat/base64/Base64TextOutputFormat.java index bc601c4..5d0cdd4 100644 --- a/contrib/src/java/org/apache/hadoop/hive/contrib/fileformat/base64/Base64TextOutputFormat.java +++ b/contrib/src/java/org/apache/hadoop/hive/contrib/fileformat/base64/Base64TextOutputFormat.java @@ -24,7 +24,7 @@ import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.io.FSRecordWriter; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; @@ -53,13 +53,13 @@ * Base64RecordWriter. * */ - public static class Base64RecordWriter implements FSRecordWriter, + public static class Base64RecordWriter implements RecordWriter, JobConfigurable { - FSRecordWriter writer; + RecordWriter writer; BytesWritable bytesWritable; - public Base64RecordWriter(FSRecordWriter writer) { + public Base64RecordWriter(RecordWriter writer) { this.writer = writer; bytesWritable = new BytesWritable(); } @@ -119,7 +119,7 @@ public void configure(JobConf job) { } @Override - public FSRecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath, + public RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath, Class valueClass, boolean isCompressed, Properties tableProperties, Progressable progress) throws IOException { diff --git a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java index 6d383b5..be1210e 100644 --- a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java +++ b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java @@ -34,7 +34,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hive.ql.io.FSRecordWriter; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.Text; @@ -71,7 +71,7 @@ } @Override - public FSRecordWriter getHiveRecordWriter( + public RecordWriter getHiveRecordWriter( final JobConf jc, final Path finalOutPath, Class valueClass, @@ -120,7 +120,7 @@ public FSRecordWriter getHiveRecordWriter( ++i; } - return new FSRecordWriter() { + return new RecordWriter() { @Override public void close(boolean abort) throws IOException { diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/io/udf/Rot13OutputFormat.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/io/udf/Rot13OutputFormat.java index 40927f6..d710de3 100644 --- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/io/udf/Rot13OutputFormat.java +++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/io/udf/Rot13OutputFormat.java @@ -22,7 +22,7 @@ import java.util.Properties; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.io.FSRecordWriter; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; @@ -36,20 +36,20 @@ extends HiveIgnoreKeyTextOutputFormat { @Override - public FSRecordWriter + public RecordWriter getHiveRecordWriter(JobConf jc, Path outPath, Class valueClass, boolean isCompressed, Properties tableProperties, Progressable progress) throws IOException { - final FSRecordWriter result = + final RecordWriter result = super.getHiveRecordWriter(jc,outPath,valueClass,isCompressed, tableProperties,progress); final Reporter reporter = (Reporter) progress; reporter.setStatus("got here"); System.out.println("Got a reporter " + reporter); - return new FSRecordWriter() { + return new RecordWriter() { @Override public void write(Writable w) throws IOException { if (w instanceof Text) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 6af6b2d..1dde78e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -35,8 +35,7 @@ import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.ErrorMsg; -import org.apache.hadoop.hive.ql.io.FSRecordWriter; -import org.apache.hadoop.hive.ql.io.FSRecordWriter.StatsProvidingRecordWriter; +import org.apache.hadoop.hive.ql.io.StatsProvidingRecordWriter; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; @@ -88,7 +87,7 @@ protected transient int dpStartCol; // start column # for DP columns protected transient List dpVals; // array of values corresponding to DP columns protected transient List dpWritables; - protected transient FSRecordWriter[] rowOutWriters; // row specific RecordWriters + protected transient RecordWriter[] rowOutWriters; // row specific RecordWriters protected transient int maxPartitions; protected transient ListBucketingCtx lbCtx; protected transient boolean isSkewedStoredAsSubDirectories; @@ -117,7 +116,7 @@ Path taskOutputTempPath; Path[] outPaths; Path[] finalPaths; - FSRecordWriter[] outWriters; + RecordWriter[] outWriters; Stat stat; public FSPaths() { @@ -128,7 +127,7 @@ public FSPaths(Path specPath) { taskOutputTempPath = Utilities.toTaskTempPath(specPath); outPaths = new Path[numFiles]; finalPaths = new Path[numFiles]; - outWriters = new FSRecordWriter[numFiles]; + outWriters = new RecordWriter[numFiles]; stat = new Stat(); } @@ -150,11 +149,11 @@ public Path getFinalPath(String taskId, Path tmpPath, String extension) { } } - public void setOutWriters(FSRecordWriter[] out) { + public void setOutWriters(RecordWriter[] out) { outWriters = out; } - public FSRecordWriter[] getOutWriters() { + public RecordWriter[] getOutWriters() { return outWriters; } @@ -599,7 +598,7 @@ public void processOp(Object row, int tag) throws HiveException { } - FSRecordWriter rowOutWriter = null; + RecordWriter rowOutWriter = null; if (row_count != null) { row_count.set(row_count.get() + 1); @@ -757,7 +756,7 @@ protected FSPaths getDynOutPaths(List row, String lbDirName) throws Hive // since we are closing the previous fsp's record writers, we need to see if we can get // stats from the record writer and store in the previous fsp that is cached if (conf.isGatherStats() && isCollectRWStats) { - FSRecordWriter outWriter = prevFsp.outWriters[0]; + RecordWriter outWriter = prevFsp.outWriters[0]; if (outWriter != null) { SerDeStats stats = ((StatsProvidingRecordWriter) outWriter).getStats(); if (stats != null) { @@ -851,7 +850,7 @@ public void closeOp(boolean abort) throws HiveException { // accumulated statistics which will be aggregated in case of spray writers if (conf.isGatherStats() && isCollectRWStats) { for (int idx = 0; idx < fsp.outWriters.length; idx++) { - FSRecordWriter outWriter = fsp.outWriters[idx]; + RecordWriter outWriter = fsp.outWriters[idx]; if (outWriter != null) { SerDeStats stats = ((StatsProvidingRecordWriter) outWriter).getStats(); if (stats != null) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index c52a093..1ca64e3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -98,7 +98,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.HiveInterruptCallback; import org.apache.hadoop.hive.common.HiveInterruptUtils; import org.apache.hadoop.hive.common.HiveStatsUtils; @@ -111,13 +110,13 @@ import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryPlan; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; import org.apache.hadoop.hive.ql.exec.mr.ExecDriver; import org.apache.hadoop.hive.ql.exec.mr.ExecMapper; import org.apache.hadoop.hive.ql.exec.mr.ExecReducer; import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; import org.apache.hadoop.hive.ql.exec.tez.TezTask; import org.apache.hadoop.hive.ql.io.ContentSummaryInputFormat; -import org.apache.hadoop.hive.ql.io.FSRecordWriter; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat; import org.apache.hadoop.hive.ql.io.HiveInputFormat; @@ -1774,7 +1773,7 @@ private static void createEmptyBuckets(Configuration hconf, ArrayList pa for (String p : paths) { Path path = new Path(p); - FSRecordWriter writer = HiveFileFormatUtils.getRecordWriter( + RecordWriter writer = HiveFileFormatUtils.getRecordWriter( jc, hiveOutputFormat, outputClass, isCompressed, tableInfo.getProperties(), path, reporter); writer.close(false); @@ -3058,7 +3057,7 @@ private static Path createEmptyFile(Path hiveScratchDir, String newFile = newDir + File.separator + "emptyFile"; Path newFilePath = new Path(newFile); - FSRecordWriter recWriter = outFileFormat.newInstance().getHiveRecordWriter(job, newFilePath, + RecordWriter recWriter = outFileFormat.newInstance().getHiveRecordWriter(job, newFilePath, Text.class, false, props, null); if (dummyRow) { // empty files are omitted at CombineHiveInputFormat. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java index 4ab5516..02956f6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java @@ -28,8 +28,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.io.FSRecordWriter; import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.PTFDeserializer; @@ -240,7 +240,7 @@ public static TableDesc createTableDesc(StructObjectInspector oI) { } - private static class PTFRecordWriter implements FSRecordWriter { + private static class PTFRecordWriter implements RecordWriter { BytesWritable EMPTY_KEY = new BytesWritable(); SequenceFile.Writer outStream; @@ -249,10 +249,12 @@ public PTFRecordWriter(SequenceFile.Writer outStream) { this.outStream = outStream; } + @Override public void write(Writable r) throws IOException { outStream.append(EMPTY_KEY, r); } + @Override public void close(boolean abort) throws IOException { outStream.close(); } @@ -262,7 +264,7 @@ public void close(boolean abort) throws IOException { extends HiveSequenceFileOutputFormat { @Override - public FSRecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath, + public RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath, Class valueClass, boolean isCompressed, Properties tableProperties, Progressable progress) throws IOException { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java index 768467e..5271e91 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java @@ -30,8 +30,8 @@ import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.io.FSRecordWriter; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -106,7 +106,7 @@ int acutalSplitNum = 0; int currentSplitPointer = 0; org.apache.hadoop.mapred.RecordReader rr = null; // record reader - FSRecordWriter rw = null; + RecordWriter rw = null; InputFormat inputFormat = null; InputSplit[] inputSplits = null; private ROW dummyRow = null; @@ -213,7 +213,7 @@ public ROW first() throws HiveException { JobConf localJc = getLocalFSJobConfClone(jc); if (inputSplits == null) { if (this.inputFormat == null) { - inputFormat = (InputFormat) ReflectionUtils.newInstance( + inputFormat = ReflectionUtils.newInstance( tblDesc.getInputFileFormatClass(), localJc); } @@ -537,7 +537,7 @@ protected void setupWriter() throws HiveException { } - protected FSRecordWriter getRecordWriter() { + protected RecordWriter getRecordWriter() { return rw; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java index 551f3aa..6b330e1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java @@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.Reporter; @@ -239,6 +240,6 @@ public RecordUpdater getRecordUpdater(Path path, * @return a record writer * @throws IOException */ - public FSRecordWriter getRawRecordWriter(Path path, + public RecordWriter getRawRecordWriter(Path path, Options options) throws IOException; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/FSRecordWriter.java b/ql/src/java/org/apache/hadoop/hive/ql/io/FSRecordWriter.java deleted file mode 100644 index 83ac010..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/FSRecordWriter.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.io; - -import java.io.IOException; - -import org.apache.hadoop.hive.serde2.SerDeStats; -import org.apache.hadoop.io.Writable; - -/** - * Record writer used by file sink operator. - * - * FSRecordWriter. - * - */ -public interface FSRecordWriter { - void write(Writable w) throws IOException; - - void close(boolean abort) throws IOException; - - /** - * If a file format internally gathers statistics (like ORC) while writing then - * it can expose the statistics through this record writer interface. Writer side - * statistics is useful for updating the metastore with table/partition level - * statistics. - * StatsProvidingRecordWriter. - * - */ - public interface StatsProvidingRecordWriter extends FSRecordWriter{ - /** - * Returns the statistics information - * @return SerDeStats - */ - SerDeStats getStats(); - } - -} \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveBinaryOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveBinaryOutputFormat.java index 6768292..1fbb276 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveBinaryOutputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveBinaryOutputFormat.java @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; @@ -58,14 +59,15 @@ * @return the RecordWriter */ @Override - public FSRecordWriter getHiveRecordWriter(JobConf jc, Path outPath, + public RecordWriter getHiveRecordWriter(JobConf jc, Path outPath, Class valueClass, boolean isCompressed, Properties tableProperties, Progressable progress) throws IOException { FileSystem fs = outPath.getFileSystem(jc); final OutputStream outStream = fs.create(outPath); - return new FSRecordWriter() { + return new RecordWriter() { + @Override public void write(Writable r) throws IOException { if (r instanceof Text) { Text tr = (Text) r; @@ -77,6 +79,7 @@ public void write(Writable r) throws IOException { } } + @Override public void close(boolean abort) throws IOException { outStream.close(); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java index 95db96b..c3a83d4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java @@ -32,6 +32,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat; @@ -116,7 +117,7 @@ public static synchronized void registerOutputFormatSubstitute( * get a RealOutputFormatClassName corresponding to the HivePassThroughOutputFormat */ @SuppressWarnings("unchecked") - public static String getRealOutputFormatClassName() + public static String getRealOutputFormatClassName() { return realoutputFormat; } @@ -245,7 +246,7 @@ private static boolean checkTextInputFormat(FileSystem fs, HiveConf conf, return true; } - public static FSRecordWriter getHiveRecordWriter(JobConf jc, + public static RecordWriter getHiveRecordWriter(JobConf jc, TableDesc tableInfo, Class outputClass, FileSinkDesc conf, Path outPath, Reporter reporter) throws HiveException { boolean storagehandlerofhivepassthru = false; @@ -286,7 +287,7 @@ public static FSRecordWriter getHiveRecordWriter(JobConf jc, } } - public static FSRecordWriter getRecordWriter(JobConf jc, + public static RecordWriter getRecordWriter(JobConf jc, HiveOutputFormat hiveOutputFormat, final Class valueClass, boolean isCompressed, Properties tableProp, Path outPath, Reporter reporter diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveIgnoreKeyTextOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveIgnoreKeyTextOutputFormat.java index ad6e4ba..0444bc3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveIgnoreKeyTextOutputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveIgnoreKeyTextOutputFormat.java @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.io.BytesWritable; @@ -62,7 +63,7 @@ * @return the RecordWriter */ @Override - public FSRecordWriter getHiveRecordWriter(JobConf jc, Path outPath, + public RecordWriter getHiveRecordWriter(JobConf jc, Path outPath, Class valueClass, boolean isCompressed, Properties tableProperties, Progressable progress) throws IOException { int rowSeparator = 0; @@ -78,7 +79,8 @@ public FSRecordWriter getHiveRecordWriter(JobConf jc, Path outPath, FileSystem fs = outPath.getFileSystem(jc); final OutputStream outStream = Utilities.createCompressedStream(jc, fs .create(outPath), isCompressed); - return new FSRecordWriter() { + return new RecordWriter() { + @Override public void write(Writable r) throws IOException { if (r instanceof Text) { Text tr = (Text) r; @@ -92,6 +94,7 @@ public void write(Writable r) throws IOException { } } + @Override public void close(boolean abort) throws IOException { outStream.close(); } @@ -107,10 +110,12 @@ public IgnoreKeyWriter(org.apache.hadoop.mapred.RecordWriter writer) { this.mWriter = writer; } + @Override public synchronized void write(K key, V value) throws IOException { this.mWriter.write(null, value); } + @Override public void close(Reporter reporter) throws IOException { this.mWriter.close(reporter); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveNullValueSequenceFileOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveNullValueSequenceFileOutputFormat.java index ef6a982..e6083fe 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveNullValueSequenceFileOutputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveNullValueSequenceFileOutputFormat.java @@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.NullWritable; @@ -47,7 +48,7 @@ private boolean keyIsText; @Override - public FSRecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath, + public RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath, Class valueClass, boolean isCompressed, Properties tableProperties, Progressable progress) throws IOException { @@ -57,7 +58,8 @@ public FSRecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath, keyWritable = new HiveKey(); keyIsText = valueClass.equals(Text.class); - return new FSRecordWriter() { + return new RecordWriter() { + @Override public void write(Writable r) throws IOException { if (keyIsText) { Text text = (Text) r; @@ -73,6 +75,7 @@ public void write(Writable r) throws IOException { outStream.append(keyWritable, NULL_WRITABLE); } + @Override public void close(boolean abort) throws IOException { outStream.close(); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveOutputFormat.java index e5ac805..0e80b83 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveOutputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveOutputFormat.java @@ -22,6 +22,7 @@ import java.util.Properties; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputFormat; @@ -57,7 +58,7 @@ * progress used for status report * @return the RecordWriter for the output file */ - FSRecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath, + RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath, final Class valueClass, boolean isCompressed, Properties tableProperties, Progressable progress) throws IOException; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughOutputFormat.java index 1fb5898..0962cad 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughOutputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughOutputFormat.java @@ -73,8 +73,7 @@ private void createActualOF() throws IOException { throw new IOException(e); } OutputFormat, ? super Writable> actualOF = - (OutputFormat) - ReflectionUtils.newInstance(cls, this.getConf()); + ReflectionUtils.newInstance(cls, this.getConf()); this.actualOutputFormat = actualOF; } @@ -99,7 +98,7 @@ public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException } @Override - public FSRecordWriter getHiveRecordWriter( + public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getHiveRecordWriter( JobConf jc, Path finalOutPath, Class valueClass, boolean isCompressed, Properties tableProperties, Progressable progress) throws IOException { if (this.initialized == false) { @@ -117,10 +116,12 @@ public FSRecordWriter getHiveRecordWriter( } } + @Override public Configuration getConf() { return conf; } + @Override public void setConf(Configuration config) { if (config.get(HIVE_PASSTHROUGH_STORAGEHANDLER_OF_JOBCONFKEY) != null) { actualOutputFormatClass = config.get(HIVE_PASSTHROUGH_STORAGEHANDLER_OF_JOBCONFKEY); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughRecordWriter.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughRecordWriter.java index 2186944..454c321 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughRecordWriter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughRecordWriter.java @@ -20,12 +20,13 @@ import java.io.IOException; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; public class HivePassThroughRecordWriter , V extends Writable> -implements FSRecordWriter { +implements RecordWriter { private final org.apache.hadoop.mapred.RecordWriter mWriter; @@ -33,11 +34,13 @@ public HivePassThroughRecordWriter(org.apache.hadoop.mapred.RecordWriter w this.mWriter = writer; } + @Override @SuppressWarnings("unchecked") public void write(Writable r) throws IOException { mWriter.write(null, (V) r); } + @Override public void close(boolean abort) throws IOException { //close with null reporter mWriter.close(null); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveSequenceFileOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveSequenceFileOutputFormat.java index 0cf00e9..30b4379 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveSequenceFileOutputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveSequenceFileOutputFormat.java @@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.SequenceFile; @@ -55,7 +56,7 @@ * @return the RecordWriter for the output file */ @Override - public FSRecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath, + public RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath, Class valueClass, boolean isCompressed, Properties tableProperties, Progressable progress) throws IOException { @@ -63,11 +64,13 @@ public FSRecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath, final SequenceFile.Writer outStream = Utilities.createSequenceWriter(jc, fs, finalOutPath, BytesWritable.class, valueClass, isCompressed); - return new FSRecordWriter() { + return new RecordWriter() { + @Override public void write(Writable r) throws IOException { outStream.append(EMPTY_KEY, r); } + @Override public void close(boolean abort) throws IOException { outStream.close(); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java index e21f9fe..fa2bbcd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java @@ -118,7 +118,7 @@ public void write(WritableComparable key, BytesRefArrayWritable value) * @throws IOException */ @Override - public FSRecordWriter getHiveRecordWriter( + public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getHiveRecordWriter( JobConf jc, Path finalOutPath, Class valueClass, boolean isCompressed, Properties tableProperties, Progressable progress) throws IOException { @@ -135,11 +135,13 @@ public FSRecordWriter getHiveRecordWriter( (jc, finalOutPath.getFileSystem(jc), finalOutPath, isCompressed); - return new FSRecordWriter() { + return new org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter() { + @Override public void write(Writable r) throws IOException { outWriter.append(r); } + @Override public void close(boolean abort) throws IOException { outWriter.close(); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/StatsProvidingRecordWriter.java b/ql/src/java/org/apache/hadoop/hive/ql/io/StatsProvidingRecordWriter.java new file mode 100644 index 0000000..d05f8a1 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/StatsProvidingRecordWriter.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io; + +import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; +import org.apache.hadoop.hive.serde2.SerDeStats; + + /** + * If a file format internally gathers statistics (like ORC) while writing then + * it can expose the statistics through this record writer interface. Writer side + * statistics is useful for updating the metastore with table/partition level + * statistics. + * StatsProvidingRecordWriter. + * + */ + public interface StatsProvidingRecordWriter extends RecordWriter{ + /** + * Returns the statistics information + * @return SerDeStats + */ + SerDeStats getStats(); + } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java index 8d75b44..fba0f96 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java @@ -32,7 +32,6 @@ import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.io.FSRecordWriter; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable; import org.apache.hadoop.hive.serde2.avro.AvroSerdeException; @@ -51,7 +50,7 @@ implements HiveOutputFormat { @Override - public FSRecordWriter getHiveRecordWriter(JobConf jobConf, + public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jobConf, Path path, Class valueClass, boolean isCompressed, Properties properties, Progressable progressable) throws IOException { Schema schema; @@ -82,10 +81,12 @@ public FSRecordWriter getHiveRecordWriter(JobConf jobConf, getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) { return new RecordWriter() { + @Override public void write(LongWritable key, AvroGenericRecordWritable value) { throw new RuntimeException("Should not be called"); } + @Override public void close(Reporter reporter) { } }; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordWriter.java b/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordWriter.java index 73e1cdd..6077fc7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordWriter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordWriter.java @@ -22,14 +22,14 @@ import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.hive.ql.io.FSRecordWriter; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable; import org.apache.hadoop.io.Writable; /** * Write an Avro GenericRecord to an Avro data file. */ -public class AvroGenericRecordWriter implements FSRecordWriter{ +public class AvroGenericRecordWriter implements RecordWriter{ final private DataFileWriter dfw; public AvroGenericRecordWriter(DataFileWriter dfw) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java index 46c3bcc..578d923 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java @@ -21,7 +21,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; -import org.apache.hadoop.hive.ql.io.FSRecordWriter; +import org.apache.hadoop.hive.ql.io.StatsProvidingRecordWriter; import org.apache.hadoop.hive.ql.io.RecordUpdater; import org.apache.hadoop.hive.ql.io.orc.OrcSerde.OrcSerdeRow; import org.apache.hadoop.hive.serde2.SerDeStats; @@ -53,8 +53,7 @@ private static class OrcRecordWriter implements RecordWriter, - FSRecordWriter, - FSRecordWriter.StatsProvidingRecordWriter { + StatsProvidingRecordWriter { private Writer writer = null; private final Path path; private final OrcFile.WriterOptions options; @@ -178,7 +177,7 @@ private String getSettingFromPropsFallingBackToConf(String key, Properties props @Override - public FSRecordWriter + public StatsProvidingRecordWriter getHiveRecordWriter(JobConf conf, Path path, Class valueClass, @@ -283,7 +282,7 @@ public RecordUpdater getRecordUpdater(Path path, } @Override - public FSRecordWriter getRawRecordWriter(Path path, + public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getRawRecordWriter(Path path, Options options) throws IOException { final Path filename = AcidUtils.createFilename(path, options); final OrcFile.WriterOptions opts = @@ -300,7 +299,7 @@ public FSRecordWriter getRawRecordWriter(Path path, opts.inspector(options.getInspector()) .callback(watcher); final Writer writer = OrcFile.createWriter(filename, opts); - return new FSRecordWriter() { + return new org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter() { @Override public void write(Writable w) throws IOException { OrcStruct orc = (OrcStruct) w; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java index b87c673..30c91ea 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java @@ -23,7 +23,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.io.FSRecordWriter; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.io.parquet.convert.HiveSchemaConverter; @@ -83,7 +82,7 @@ public void checkOutputSpecs(final FileSystem ignored, final JobConf job) throws * contains the real output format */ @Override - public FSRecordWriter getHiveRecordWriter( + public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getHiveRecordWriter( final JobConf jobConf, final Path finalOutPath, final Class valueClass, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java index cd603c2..07b003a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java @@ -27,13 +27,12 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.util.Progressable; -import org.apache.hadoop.hive.ql.io.FSRecordWriter; import parquet.hadoop.ParquetOutputFormat; import parquet.hadoop.util.ContextUtil; public class ParquetRecordWriterWrapper implements RecordWriter, - FSRecordWriter { + org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter { public static final Log LOG = LogFactory.getLog(ParquetRecordWriterWrapper.class); @@ -54,8 +53,7 @@ public ParquetRecordWriterWrapper( taskContext = ContextUtil.newTaskAttemptContext(jobConf, taskAttemptID); LOG.info("creating real writer to write at " + name); - realWriter = (org.apache.hadoop.mapreduce.RecordWriter) - ((ParquetOutputFormat) realOutputFormat).getRecordWriter(taskContext, new Path(name)); + realWriter = ((ParquetOutputFormat) realOutputFormat).getRecordWriter(taskContext, new Path(name)); LOG.info("real writer: " + realWriter); } catch (final InterruptedException e) { throw new IOException(e); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index de4867a..01c5500 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -23,7 +23,6 @@ import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidTxnListImpl; @@ -31,28 +30,25 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; import org.apache.hadoop.hive.ql.io.AcidInputFormat; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; -import org.apache.hadoop.hive.ql.io.FSRecordWriter; import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; - import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobContext; -import org.apache.hadoop.mapred.JobStatus; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.OutputCommitter; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapred.RunningJob; import org.apache.hadoop.mapred.TaskAttemptContext; import org.apache.hadoop.mapred.lib.NullOutputFormat; import org.apache.hadoop.util.StringUtils; @@ -343,6 +339,7 @@ Path getBaseDir() { return deltas; } + @Override public String toString() { StringBuilder builder = new StringBuilder(); builder.append("CompactorInputSplit{base: "); @@ -491,7 +488,7 @@ public float getProgress() throws IOException { implements Mapper { JobConf jobConf; - FSRecordWriter writer; + RecordWriter writer; @Override public void map(NullWritable key, CompactorInputSplit split, diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index 5664f3f..f336c01 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -64,7 +64,6 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; -import org.apache.hadoop.hive.ql.io.FSRecordWriter; import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hadoop.hive.ql.io.InputFormatChecker; @@ -440,6 +439,7 @@ public MockBlock(String... hosts) { this.hosts = hosts; } + @Override public String toString() { StringBuilder buffer = new StringBuilder(); buffer.append("block{offset: "); @@ -480,6 +480,7 @@ public MockFile(String path, int blockSize, byte[] content, } } + @Override public String toString() { StringBuilder buffer = new StringBuilder(); buffer.append("mockFile{path: "); @@ -564,6 +565,7 @@ public void setBlocks(MockBlock... blocks) { } } + @Override public void close() throws IOException { super.close(); DataOutputBuffer buf = (DataOutputBuffer) getWrappedStream(); @@ -581,6 +583,7 @@ public MockFileSystem() { // empty } + @Override public void initialize(URI uri, Configuration conf) { setConf(conf); } @@ -924,7 +927,7 @@ public void testInOutFormat() throws Exception { } SerDe serde = new OrcSerde(); HiveOutputFormat outFormat = new OrcOutputFormat(); - FSRecordWriter writer = + org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter writer = outFormat.getHiveRecordWriter(conf, testFilePath, MyRow.class, true, properties, Reporter.NULL); writer.write(serde.serialize(new MyRow(1,2), inspector)); @@ -1088,7 +1091,7 @@ public void testMROutput() throws Exception { public void testEmptyFile() throws Exception { Properties properties = new Properties(); HiveOutputFormat outFormat = new OrcOutputFormat(); - FSRecordWriter writer = + org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter writer = outFormat.getHiveRecordWriter(conf, testFilePath, MyRow.class, true, properties, Reporter.NULL); writer.close(true); @@ -1133,7 +1136,7 @@ public void testDefaultTypes() throws Exception { } SerDe serde = new OrcSerde(); HiveOutputFormat outFormat = new OrcOutputFormat(); - FSRecordWriter writer = + org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter writer = outFormat.getHiveRecordWriter(conf, testFilePath, StringRow.class, true, properties, Reporter.NULL); writer.write(serde.serialize(new StringRow("owen"), inspector)); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java index eaabc71..7f5134e 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java @@ -32,7 +32,6 @@ import org.apache.hadoop.hive.ql.io.AcidInputFormat; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; -import org.apache.hadoop.hive.ql.io.FSRecordWriter; import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.ql.io.RecordUpdater; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; @@ -65,8 +64,8 @@ protected IMetaStoreClient ms; protected long sleepTime = 1000; - private MetaStoreThread.BooleanPointer stop = new MetaStoreThread.BooleanPointer(); - private File tmpdir; + private final MetaStoreThread.BooleanPointer stop = new MetaStoreThread.BooleanPointer(); + private final File tmpdir; protected CompactorTest() throws Exception { HiveConf conf = new HiveConf(); @@ -329,10 +328,10 @@ public boolean validateInput(FileSystem fs, HiveConf conf, ArrayList } static class MockRawReader implements AcidInputFormat.RawReader { - private Stack filesToRead; - private Configuration conf; + private final Stack filesToRead; + private final Configuration conf; private FSDataInputStream is = null; - private FileSystem fs; + private final FileSystem fs; MockRawReader(Configuration conf, List files) throws IOException { filesToRead = new Stack(); @@ -408,12 +407,12 @@ public RecordUpdater getRecordUpdater(Path path, Options options) throws } @Override - public FSRecordWriter getRawRecordWriter(Path path, Options options) throws IOException { + public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getRawRecordWriter(Path path, Options options) throws IOException { return new MockRecordWriter(path, options); } @Override - public FSRecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath, + public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath, Class valueClass, boolean isCompressed, Properties tableProperties, Progressable progress) throws IOException { @@ -437,8 +436,8 @@ public void checkOutputSpecs(FileSystem fileSystem, JobConf entries) throws IOEx // This class isn't used and I suspect does totally the wrong thing. It's only here so that I // can provide some output format to the tables and partitions I create. I actually write to // those tables directory. - static class MockRecordWriter implements FSRecordWriter { - private FSDataOutputStream os; + static class MockRecordWriter implements org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter { + private final FSDataOutputStream os; MockRecordWriter(Path basedir, AcidOutputFormat.Options options) throws IOException { FileSystem fs = FileSystem.get(options.getConfiguration());