commit 0114df7df82207f8a0d8c2f42114e20009417db6 Author: Owen O'Malley Date: Wed Mar 19 14:42:39 2014 -0700 HIVE-6699 Fix OrcRecordUpdater to use sync instead of flush. diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 3840a27..7f0260e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -49,9 +49,6 @@ import org.apache.hadoop.hive.ql.io.InputFormatChecker; import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.ql.io.StatsProvidingRecordReader; -import org.apache.hadoop.hive.ql.io.orc.Metadata; -import org.apache.hadoop.hive.ql.io.orc.ReaderImpl.FileMetaInfo; -import org.apache.hadoop.hive.ql.io.orc.RecordReader; import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.ql.log.PerfLogger; @@ -104,10 +101,11 @@ AcidInputFormat { private static final Log LOG = LogFactory.getLog(OrcInputFormat.class); + static final HadoopShims SHIMS = ShimLoader.getHadoopShims(); static final String MIN_SPLIT_SIZE = - ShimLoader.getHadoopShims().getHadoopConfNames().get("MAPREDMINSPLITSIZE"); + SHIMS.getHadoopConfNames().get("MAPREDMINSPLITSIZE"); static final String MAX_SPLIT_SIZE = - ShimLoader.getHadoopShims().getHadoopConfNames().get("MAPREDMAXSPLITSIZE"); + SHIMS.getHadoopConfNames().get("MAPREDMAXSPLITSIZE"); private static final long DEFAULT_MIN_SPLIT_SIZE = 16 * 1024 * 1024; private static final long DEFAULT_MAX_SPLIT_SIZE = 256 * 1024 * 1024; @@ -350,7 +348,6 @@ public boolean validateInput(FileSystem fs, HiveConf conf, new ArrayList(10000); private final int numBuckets; private final List errors = new ArrayList(); - private final HadoopShims shims = ShimLoader.getHadoopShims(); private final long maxSize; private final long minSize; private final boolean footerInSplits; @@ -520,7 +517,7 @@ public void run() { // find the base files (original or new style) List children = original; if (base != null) { - children = context.shims.listLocatedStatus(fs, base, + children = SHIMS.listLocatedStatus(fs, base, AcidUtils.hiddenFileFilter); } @@ -624,7 +621,7 @@ private FileInfo verifyCachedFileInfo(FileStatus file) { this.file = file; this.blockSize = file.getBlockSize(); this.fileInfo = fileInfo; - locations = context.shims.getLocations(fs, file); + locations = SHIMS.getLocations(fs, file); this.isOriginal = isOriginal; this.deltas = deltas; this.hasBase = hasBase; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java index e376993..b660380 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java @@ -254,7 +254,7 @@ public void flush() throws IOException { } long len = writer.writeIntermediateFooter(); flushLengths.writeLong(len); - flushLengths.flush(); + OrcInputFormat.SHIMS.hflush(flushLengths); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java index 026d5b4..bf518d5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java @@ -2071,7 +2071,7 @@ public synchronized long writeIntermediateFooter() throws IOException { int footLength = writeFooter(rawWriter.getPos() - metaLength); rawWriter.writeByte(writePostScript(footLength, metaLength)); stripesAtLastFlush = stripes.size(); - rawWriter.flush(); + OrcInputFormat.SHIMS.hflush(rawWriter); } return rawWriter.getPos(); } diff --git shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java index 61f8894..7042c26 100644 --- shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java +++ shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java @@ -44,6 +44,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -626,6 +627,11 @@ public UserGroupInformation createProxyUser(String userName) throws IOException } @Override + public void hflush(FSDataOutputStream stream) throws IOException { + stream.sync(); + } + + @Override public void authorizeProxyAccess(String proxyUser, UserGroupInformation realUserUgi, String ipAddress, Configuration conf) throws IOException { // This hadoop version doesn't have proxy verification diff --git shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java index e66cda0..b1d7cb0 100644 --- shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java +++ shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java @@ -33,6 +33,7 @@ import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -376,6 +377,11 @@ public WebHCatJTShim getWebHCatShim(Configuration conf, UserGroupInformation ugi } @Override + public void hflush(FSDataOutputStream stream) throws IOException { + stream.sync(); + } + + @Override public FileSystem createProxyFileSystem(FileSystem fs, URI uri) { return new ProxyFileSystem(fs, uri); } diff --git shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java index f56c23b..7c729fa 100644 --- shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java +++ shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java @@ -36,6 +36,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; @@ -492,6 +493,11 @@ public WebHCatJTShim getWebHCatShim(Configuration conf, UserGroupInformation ugi } } + @Override + public void hflush(FSDataOutputStream stream) throws IOException { + stream.hflush(); + } + class ProxyFileSystem23 extends ProxyFileSystem { public ProxyFileSystem23(FileSystem fs) { super(fs); diff --git shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java index 5c44793..9efbd4f 100644 --- shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java +++ shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java @@ -38,6 +38,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -462,6 +463,13 @@ RecordReader getRecordReader(JobConf job, InputSplitShim split, Reporter reporte BlockLocation[] getLocations(FileSystem fs, FileStatus status) throws IOException; + /** + * Flush and make visible to other users the changes to the given stream. + * @param stream the stream to hflush. + * @throws IOException + */ + public void hflush(FSDataOutputStream stream) throws IOException; + public HCatHadoopShims getHCatShim(); public interface HCatHadoopShims {