Index: ql/pom.xml IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- ql/pom.xml (revision d91cc0cd84b7d0ecc0f29d44b109b46e21194eec) +++ ql/pom.xml (revision aa9416d6ef1d72f908d3f23aabe48d33808d3a9e) @@ -275,6 +275,11 @@ true + org.apache.hadoop + hadoop-aws + ${hadoop.version} + + org.apache.orc orc-tools ${orc.version} Index: ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (revision d91cc0cd84b7d0ecc0f29d44b109b46e21194eec) +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (revision aa9416d6ef1d72f908d3f23aabe48d33808d3a9e) @@ -24,6 +24,8 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.S3AInputPolicy; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.StringInternUtils; import org.apache.hadoop.hive.common.ValidTxnWriteIdList; @@ -38,6 +40,8 @@ import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; +import org.apache.hadoop.hive.ql.io.parquet.VectorizedParquetInputFormat; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler; @@ -88,6 +92,7 @@ import java.util.concurrent.Future; import static java.lang.Integer.min; +import static org.apache.hadoop.hive.common.FileUtils.isS3a; /** * HiveInputFormat is a parameterized InputFormat which looks at the path name @@ -376,6 +381,19 @@ return instance; } + /** + * Returns true if the inputFormat performs random seek+read + * @param inputFormat + * @return + */ + private static boolean isRandomAccessInputFormat(InputFormat inputFormat) { + if (inputFormat instanceof OrcInputFormat || + inputFormat instanceof VectorizedParquetInputFormat) { + return true; + } + return false; + } + @Override public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException { @@ -428,6 +446,13 @@ innerReader = HiveIOExceptionHandlerUtil .handleRecordReaderCreationException(e, job); } + + FileSystem splitFileSystem = splitPath.getFileSystem(job); + if (isS3a(splitFileSystem) && isRandomAccessInputFormat(inputFormat)) { + LOG.debug("Changing S3A input policy to RANDOM for split {}", splitPath); + ((S3AFileSystem) splitFileSystem).setInputPolicy(S3AInputPolicy.Random); + } + HiveRecordReader rr = new HiveRecordReader(innerReader, job); rr.initIOContext(hsplit, job, inputFormatClass, innerReader); return rr; Index: ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java (revision d91cc0cd84b7d0ecc0f29d44b109b46e21194eec) +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java (revision aa9416d6ef1d72f908d3f23aabe48d33808d3a9e) @@ -2156,13 +2156,13 @@ * @throws IOException * @throws HiveException */ - JobConf createMockExecutionEnvironment(Path workDir, + static JobConf createMockExecutionEnvironment(Path workDir, Path warehouseDir, String tableName, ObjectInspector objectInspector, boolean isVectorized, - int partitions - ) throws IOException, HiveException { + int partitions, + String currFileSystemName) throws IOException, HiveException { JobConf conf = new JobConf(); Utilities.clearWorkMap(conf); conf.set("hive.exec.plan", workDir.toString()); @@ -2171,7 +2171,7 @@ conf.set("hive.vectorized.execution.enabled", isVectorizedString); conf.set(Utilities.VECTOR_MODE, isVectorizedString); conf.set(Utilities.USE_VECTORIZED_INPUT_FILE_FORMAT, isVectorizedString); - conf.set("fs.mock.impl", MockFileSystem.class.getName()); + conf.set("fs.mock.impl", currFileSystemName); conf.set("mapred.mapper.class", ExecMapper.class.getName()); Path root = new Path(warehouseDir, tableName); // clean out previous contents @@ -2290,7 +2290,7 @@ ObjectInspectorFactory.ObjectInspectorOptions.JAVA); } JobConf conf = createMockExecutionEnvironment(workDir, new Path("mock:///"), - "vectorization", inspector, true, 1); + "vectorization", inspector, true, 1, MockFileSystem.class.getName()); // write the orc file to the mock file system Path path = new Path(conf.get("mapred.input.dir") + "/0_0"); @@ -2337,7 +2337,7 @@ ObjectInspectorFactory.ObjectInspectorOptions.JAVA); } JobConf conf = createMockExecutionEnvironment(workDir, new Path("mock:///"), - "vectorBuckets", inspector, true, 1); + "vectorBuckets", inspector, true, 1, MockFileSystem.class.getName()); // write the orc file to the mock file system Path path = new Path(conf.get("mapred.input.dir") + "/0_0"); @@ -2376,7 +2376,7 @@ public void testVectorizationWithAcid() throws Exception { StructObjectInspector inspector = new BigRowInspector(); JobConf conf = createMockExecutionEnvironment(workDir, new Path("mock:///"), - "vectorizationAcid", inspector, true, 1); + "vectorizationAcid", inspector, true, 1, MockFileSystem.class.getName()); conf.set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString()); @@ -2457,7 +2457,7 @@ ObjectInspectorFactory.ObjectInspectorOptions.JAVA); } JobConf conf = createMockExecutionEnvironment(workDir, new Path("mock:///"), - "combination", inspector, false, 1); + "combination", inspector, false, 1, MockFileSystem.class.getName()); // write the orc file to the mock file system Path partDir = new Path(conf.get("mapred.input.dir")); @@ -2529,7 +2529,7 @@ ObjectInspectorFactory.ObjectInspectorOptions.JAVA); } JobConf conf = createMockExecutionEnvironment(workDir, new Path("mock:///"), - "combinationAcid", inspector, false, PARTITIONS); + "combinationAcid", inspector, false, PARTITIONS, MockFileSystem.class.getName()); // write the orc file to the mock file system Path[] partDir = new Path[PARTITIONS]; @@ -3340,7 +3340,7 @@ ObjectInspectorFactory.ObjectInspectorOptions.JAVA); } JobConf jobConf = createMockExecutionEnvironment(workDir, new Path("mock:///"), - "mocktable3", inspector, true, 0); + "mocktable3", inspector, true, 0, MockFileSystem.class.getName()); Writer writer = OrcFile.createWriter(new Path(mockPath + "/0_0"), OrcFile.writerOptions(conf).blockPadding(false) @@ -3413,7 +3413,7 @@ ObjectInspectorFactory.ObjectInspectorOptions.JAVA); } JobConf jobConf = createMockExecutionEnvironment(workDir, new Path("mock:///"), - "mocktable4", inspector, true, 0); + "mocktable4", inspector, true, 0, MockFileSystem.class.getName()); Writer writer = OrcFile.createWriter(new Path(mockPath + "/0_0"), OrcFile.writerOptions(conf).blockPadding(false) Index: ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRandomAccessHiveInputFormat.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRandomAccessHiveInputFormat.java (revision aa9416d6ef1d72f908d3f23aabe48d33808d3a9e) +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRandomAccessHiveInputFormat.java (revision aa9416d6ef1d72f908d3f23aabe48d33808d3a9e) @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.orc; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.HiveInputFormat; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.junit.Test; + +import static org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.createMockExecutionEnvironment; +import static org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.setBlocks; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestRandomAccessHiveInputFormat { + + Path workDir = new Path(System.getProperty("test.tmp.dir","target/tmp")); + + /** + * MockFileSystem that pretends to be an S3A system + */ + public static class MockS3aFileSystem + extends TestInputOutputFormat.MockFileSystem { + + @Override + public String getScheme() { + return "s3a"; + } + } + + @Test + // Make sure that the FS InputPolicy is changed to Random for ORC on S3A + public void testOrcSplitOnS3A() throws Exception { + // get the object inspector for MyRow + StructObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = (StructObjectInspector) + ObjectInspectorFactory.getReflectionObjectInspector(TestInputOutputFormat.MyRow.class, + ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + // Use ORC files stored on S3A + JobConf conf = createMockExecutionEnvironment(workDir, new Path("mock:///"), + "randomAccessVectorized", inspector, true, 1, MockS3aFileSystem.class.getName()); + + // write the orc file to the mock file system + Path path = new Path(conf.get("mapred.input.dir") + "/0_0"); + Writer writer = OrcFile.createWriter(path, OrcFile.writerOptions(conf).blockPadding(false) + .bufferSize(1024).inspector(inspector)); + writer.addRow(new TestInputOutputFormat.MyRow(0, 0)); + writer.addRow(new TestInputOutputFormat.MyRow(1, 2)); + writer.close(); + + setBlocks(path, conf, new TestInputOutputFormat.MockBlock("host0")); + + HiveInputFormat inputFormat = new HiveInputFormat<>(); + + InputSplit[] splits = inputFormat.getSplits(conf, 2); + assertEquals(1, splits.length); + + Throwable thrown = null; + try { + inputFormat.getRecordReader(splits[0], conf, Reporter.NULL); + } catch (Exception e) { + thrown = e; + } + + // As we are mocking a simple FS we just expect an cast exception to occur + assertEquals(thrown.getClass(), ClassCastException.class); + assertTrue(thrown.getMessage().contains("S3AFileSystem")); + } +}