Index: ql/pom.xml
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- ql/pom.xml (revision d91cc0cd84b7d0ecc0f29d44b109b46e21194eec)
+++ ql/pom.xml (revision aa9416d6ef1d72f908d3f23aabe48d33808d3a9e)
@@ -275,6 +275,11 @@
true
+ org.apache.hadoop
+ hadoop-aws
+ ${hadoop.version}
+
+
org.apache.orc
orc-tools
${orc.version}
Index: ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (revision d91cc0cd84b7d0ecc0f29d44b109b46e21194eec)
+++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (revision aa9416d6ef1d72f908d3f23aabe48d33808d3a9e)
@@ -24,6 +24,8 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3AInputPolicy;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.common.StringInternUtils;
import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
@@ -38,6 +40,8 @@
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.parquet.VectorizedParquetInputFormat;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler;
@@ -88,6 +92,7 @@
import java.util.concurrent.Future;
import static java.lang.Integer.min;
+import static org.apache.hadoop.hive.common.FileUtils.isS3a;
/**
* HiveInputFormat is a parameterized InputFormat which looks at the path name
@@ -376,6 +381,19 @@
return instance;
}
+ /**
+ * Returns true if the inputFormat performs random seek+read
+ * @param inputFormat
+ * @return
+ */
+ private static boolean isRandomAccessInputFormat(InputFormat inputFormat) {
+ if (inputFormat instanceof OrcInputFormat ||
+ inputFormat instanceof VectorizedParquetInputFormat) {
+ return true;
+ }
+ return false;
+ }
+
@Override
public RecordReader getRecordReader(InputSplit split, JobConf job,
Reporter reporter) throws IOException {
@@ -428,6 +446,13 @@
innerReader = HiveIOExceptionHandlerUtil
.handleRecordReaderCreationException(e, job);
}
+
+ FileSystem splitFileSystem = splitPath.getFileSystem(job);
+ if (isS3a(splitFileSystem) && isRandomAccessInputFormat(inputFormat)) {
+ LOG.debug("Changing S3A input policy to RANDOM for split {}", splitPath);
+ ((S3AFileSystem) splitFileSystem).setInputPolicy(S3AInputPolicy.Random);
+ }
+
HiveRecordReader rr = new HiveRecordReader(innerReader, job);
rr.initIOContext(hsplit, job, inputFormatClass, innerReader);
return rr;
Index: ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java (revision d91cc0cd84b7d0ecc0f29d44b109b46e21194eec)
+++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java (revision aa9416d6ef1d72f908d3f23aabe48d33808d3a9e)
@@ -2156,13 +2156,13 @@
* @throws IOException
* @throws HiveException
*/
- JobConf createMockExecutionEnvironment(Path workDir,
+ static JobConf createMockExecutionEnvironment(Path workDir,
Path warehouseDir,
String tableName,
ObjectInspector objectInspector,
boolean isVectorized,
- int partitions
- ) throws IOException, HiveException {
+ int partitions,
+ String currFileSystemName) throws IOException, HiveException {
JobConf conf = new JobConf();
Utilities.clearWorkMap(conf);
conf.set("hive.exec.plan", workDir.toString());
@@ -2171,7 +2171,7 @@
conf.set("hive.vectorized.execution.enabled", isVectorizedString);
conf.set(Utilities.VECTOR_MODE, isVectorizedString);
conf.set(Utilities.USE_VECTORIZED_INPUT_FILE_FORMAT, isVectorizedString);
- conf.set("fs.mock.impl", MockFileSystem.class.getName());
+ conf.set("fs.mock.impl", currFileSystemName);
conf.set("mapred.mapper.class", ExecMapper.class.getName());
Path root = new Path(warehouseDir, tableName);
// clean out previous contents
@@ -2290,7 +2290,7 @@
ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
}
JobConf conf = createMockExecutionEnvironment(workDir, new Path("mock:///"),
- "vectorization", inspector, true, 1);
+ "vectorization", inspector, true, 1, MockFileSystem.class.getName());
// write the orc file to the mock file system
Path path = new Path(conf.get("mapred.input.dir") + "/0_0");
@@ -2337,7 +2337,7 @@
ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
}
JobConf conf = createMockExecutionEnvironment(workDir, new Path("mock:///"),
- "vectorBuckets", inspector, true, 1);
+ "vectorBuckets", inspector, true, 1, MockFileSystem.class.getName());
// write the orc file to the mock file system
Path path = new Path(conf.get("mapred.input.dir") + "/0_0");
@@ -2376,7 +2376,7 @@
public void testVectorizationWithAcid() throws Exception {
StructObjectInspector inspector = new BigRowInspector();
JobConf conf = createMockExecutionEnvironment(workDir, new Path("mock:///"),
- "vectorizationAcid", inspector, true, 1);
+ "vectorizationAcid", inspector, true, 1, MockFileSystem.class.getName());
conf.set(ValidTxnList.VALID_TXNS_KEY,
new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
@@ -2457,7 +2457,7 @@
ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
}
JobConf conf = createMockExecutionEnvironment(workDir, new Path("mock:///"),
- "combination", inspector, false, 1);
+ "combination", inspector, false, 1, MockFileSystem.class.getName());
// write the orc file to the mock file system
Path partDir = new Path(conf.get("mapred.input.dir"));
@@ -2529,7 +2529,7 @@
ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
}
JobConf conf = createMockExecutionEnvironment(workDir, new Path("mock:///"),
- "combinationAcid", inspector, false, PARTITIONS);
+ "combinationAcid", inspector, false, PARTITIONS, MockFileSystem.class.getName());
// write the orc file to the mock file system
Path[] partDir = new Path[PARTITIONS];
@@ -3340,7 +3340,7 @@
ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
}
JobConf jobConf = createMockExecutionEnvironment(workDir, new Path("mock:///"),
- "mocktable3", inspector, true, 0);
+ "mocktable3", inspector, true, 0, MockFileSystem.class.getName());
Writer writer =
OrcFile.createWriter(new Path(mockPath + "/0_0"),
OrcFile.writerOptions(conf).blockPadding(false)
@@ -3413,7 +3413,7 @@
ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
}
JobConf jobConf = createMockExecutionEnvironment(workDir, new Path("mock:///"),
- "mocktable4", inspector, true, 0);
+ "mocktable4", inspector, true, 0, MockFileSystem.class.getName());
Writer writer =
OrcFile.createWriter(new Path(mockPath + "/0_0"),
OrcFile.writerOptions(conf).blockPadding(false)
Index: ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRandomAccessHiveInputFormat.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRandomAccessHiveInputFormat.java (revision aa9416d6ef1d72f908d3f23aabe48d33808d3a9e)
+++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRandomAccessHiveInputFormat.java (revision aa9416d6ef1d72f908d3f23aabe48d33808d3a9e)
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.junit.Test;
+
+import static org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.createMockExecutionEnvironment;
+import static org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.setBlocks;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestRandomAccessHiveInputFormat {
+
+ Path workDir = new Path(System.getProperty("test.tmp.dir","target/tmp"));
+
+ /**
+ * MockFileSystem that pretends to be an S3A system
+ */
+ public static class MockS3aFileSystem
+ extends TestInputOutputFormat.MockFileSystem {
+
+ @Override
+ public String getScheme() {
+ return "s3a";
+ }
+ }
+
+ @Test
+ // Make sure that the FS InputPolicy is changed to Random for ORC on S3A
+ public void testOrcSplitOnS3A() throws Exception {
+ // get the object inspector for MyRow
+ StructObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = (StructObjectInspector)
+ ObjectInspectorFactory.getReflectionObjectInspector(TestInputOutputFormat.MyRow.class,
+ ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ // Use ORC files stored on S3A
+ JobConf conf = createMockExecutionEnvironment(workDir, new Path("mock:///"),
+ "randomAccessVectorized", inspector, true, 1, MockS3aFileSystem.class.getName());
+
+ // write the orc file to the mock file system
+ Path path = new Path(conf.get("mapred.input.dir") + "/0_0");
+ Writer writer = OrcFile.createWriter(path, OrcFile.writerOptions(conf).blockPadding(false)
+ .bufferSize(1024).inspector(inspector));
+ writer.addRow(new TestInputOutputFormat.MyRow(0, 0));
+ writer.addRow(new TestInputOutputFormat.MyRow(1, 2));
+ writer.close();
+
+ setBlocks(path, conf, new TestInputOutputFormat.MockBlock("host0"));
+
+ HiveInputFormat inputFormat = new HiveInputFormat<>();
+
+ InputSplit[] splits = inputFormat.getSplits(conf, 2);
+ assertEquals(1, splits.length);
+
+ Throwable thrown = null;
+ try {
+ inputFormat.getRecordReader(splits[0], conf, Reporter.NULL);
+ } catch (Exception e) {
+ thrown = e;
+ }
+
+ // As we are mocking a simple FS we just expect an cast exception to occur
+ assertEquals(thrown.getClass(), ClassCastException.class);
+ assertTrue(thrown.getMessage().contains("S3AFileSystem"));
+ }
+}