commit 2b113e29ff953355f19503cbd85b396f970c8c06 Author: VM User Date: Tue Jan 21 16:46:04 2014 -0800 update patch diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index a78b72f..4d3b8c5 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -517,8 +517,19 @@ // Define the default ORC stripe size HIVE_ORC_DEFAULT_STRIPE_SIZE("hive.exec.orc.default.stripe.size", 256L * 1024 * 1024), - - HIVE_ORC_DICTIONARY_KEY_SIZE_THRESHOLD("hive.exec.orc.dictionary.key.size.threshold", 0.8f), + // Define the default ORC index stripe + HIVE_ORC_DEFAULT_ROW_INDEX_STRIDE("hive.exec.orc.default.row.index.stride" + , null), + // Define the default ORC buffer size + HIVE_ORC_DEFAULT_BUFFER_SIZE("hive.exec.orc.default.buffer.size", null), + // Define the default block padding + HIVE_ORC_DEFAULT_BLOCK_PADDING("hive.exec.orc.default.block.padding", + null), + // Define the default orc compress + HIVE_ORC_DEFAULT_COMPRESS("hive.exec.orc.default.compress", null), + + HIVE_ORC_DICTIONARY_KEY_SIZE_THRESHOLD( + "hive.exec.orc.dictionary.key.size.threshold", 0.8f), HIVE_ORC_INCLUDE_FILE_FOOTER_IN_SPLITS("hive.orc.splits.include.file.footer", false), HIVE_ORC_CACHE_STRIPE_DETAILS_SIZE("hive.orc.cache.stripe.details.size", 10000), diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java index 54aee08..e864473 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java @@ -153,6 +153,19 @@ public static Reader createReader(FileSystem fs, Path path, FileMetaInfo fileMet stripeSizeValue = conf.getLong(HiveConf.ConfVars.HIVE_ORC_DEFAULT_STRIPE_SIZE.varname, DEFAULT_STRIPE_SIZE); + rowIndexStrideValue = + conf.getInt(HiveConf.ConfVars.HIVE_ORC_DEFAULT_ROW_INDEX_STRIDE + .varname, DEFAULT_ROW_INDEX_STRIDE); + bufferSizeValue = + conf.getInt(HiveConf.ConfVars.HIVE_ORC_DEFAULT_BUFFER_SIZE.varname, + DEFAULT_ROW_INDEX_STRIDE); + blockPaddingValue = + conf.getBoolean(HiveConf.ConfVars.HIVE_ORC_DEFAULT_BLOCK_PADDING + .varname, DEFAULT_BLOCK_PADDING); + compressValue = + CompressionKind.valueOf(conf.get(HiveConf.ConfVars + .HIVE_ORC_DEFAULT_COMPRESS.varname, + DEFAULT_COMPRESSION_KIND.toString())); String versionName = conf.get(HiveConf.ConfVars.HIVE_ORC_WRITE_FORMAT.varname); if (versionName == null) { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index a5747a6..b9b4651 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -44,9 +44,8 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.io.InputFormatChecker; import org.apache.hadoop.hive.ql.io.orc.Metadata; -import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.FileGenerator; -import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.SplitGenerator; import org.apache.hadoop.hive.ql.io.orc.Reader.FileMetaInfo; +import org.apache.hadoop.hive.ql.io.orc.RecordReader; import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.ql.log.PerfLogger; @@ -63,7 +62,6 @@ import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.InvalidInputException; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.util.StringUtils; @@ -99,8 +97,8 @@ private static final double MIN_INCLUDED_LOCATION = 0.80; private static class OrcRecordReader - implements RecordReader { - private final org.apache.hadoop.hive.ql.io.orc.RecordReader reader; + implements org.apache.hadoop.mapred.RecordReader { + private final RecordReader reader; private final long offset; private final long length; private final int numColumns; @@ -111,10 +109,7 @@ long offset, long length) throws IOException { List types = file.getTypes(); numColumns = (types.size() == 0) ? 0 : types.get(0).getSubtypesCount(); - boolean[] includedColumns = findIncludedColumns(types, conf); - String[] columnNames = getIncludedColumnNames(types, includedColumns, conf); - SearchArgument sarg = createSarg(types, conf); - this.reader = file.rows(offset, length, includedColumns, sarg, columnNames); + this.reader = createReaderFromFile(file, conf, offset, length); this.offset = offset; this.length = length; } @@ -155,6 +150,19 @@ public float getProgress() throws IOException { return progress; } } + + static RecordReader createReaderFromFile( + Reader file, Configuration conf, long offset, long length) + throws IOException { + List types = file.getTypes(); + boolean[] includedColumns = findIncludedColumns(types, conf); + String[] columnNames = getIncludedColumnNames(types, includedColumns, + conf); + SearchArgument sarg = createSarg(types, conf); + RecordReader reader = + file.rows(offset, length, includedColumns, sarg, columnNames); + return reader; + } private static final PathFilter hiddenFileFilter = new PathFilter(){ public boolean accept(Path p){ @@ -244,14 +252,15 @@ public static SearchArgument createSarg(List types, Configuration } } + @SuppressWarnings("unchecked") @Override - public RecordReader + public org.apache.hadoop.mapred.RecordReader getRecordReader(InputSplit inputSplit, JobConf conf, Reporter reporter) throws IOException { if (isVectorMode(conf)) { - RecordReader vorr = voif.getRecordReader(inputSplit, conf, + org.apache.hadoop.mapred.RecordReader vorr = voif.getRecordReader(inputSplit, conf, reporter); - return (RecordReader) vorr; + return (org.apache.hadoop.mapred.RecordReader) vorr; } FileSplit fSplit = (FileSplit)inputSplit; reporter.setStatus(fSplit.toString()); @@ -308,7 +317,7 @@ private boolean isVectorMode(Configuration conf) { * @param conf The configuration of the job * @return the list of input {@link Path}s for the map-reduce job. */ - static Path[] getInputPaths(JobConf conf) throws IOException { + static Path[] getInputPaths(Configuration conf) throws IOException { String dirs = conf.get("mapred.input.dir"); if (dirs == null) { throw new IOException("Configuration mapred.input.dir is not defined."); @@ -326,10 +335,41 @@ private boolean isVectorMode(Configuration conf) { * the different worker threads. */ static class Context { + static class FileSplitInfo { + FileSplitInfo(Path file, long start, long length, String[] hosts, + FileMetaInfo fileMetaInfo) { + this.file = file; + this.start = start; + this.length = length; + this.hosts = hosts; + this.fileMetaInfo = fileMetaInfo; + } + Path getPath() { + return file; + } + long getStart() { + return start; + } + long getLength() { + return length; + } + String[] getLocations() { + return hosts; + } + FileMetaInfo getFileMetaInfo() { + return fileMetaInfo; + } + private Path file; + private long start; + private long length; + private String[] hosts; + FileMetaInfo fileMetaInfo; + } private final Configuration conf; private static Cache footerCache; private final ExecutorService threadPool; - private final List splits = new ArrayList(10000); + private final List splits = + new ArrayList(10000); private final List errors = new ArrayList(); private final HadoopShims shims = ShimLoader.getHadoopShims(); private final long maxSize; @@ -378,7 +418,7 @@ int getSchedulers() { * the back. * @result the Nth file split */ - OrcSplit getResult(int index) { + FileSplitInfo getResult(int index) { if (index >= 0) { return splits.get(index); } else { @@ -556,8 +596,8 @@ void schedule() throws IOException { if(locations.length == 1 && file.getLen() < context.maxSize) { String[] hosts = locations[0].getHosts(); synchronized (context.splits) { - context.splits.add(new OrcSplit(file.getPath(), 0, file.getLen(), - hosts, fileMetaInfo)); + context.splits.add(new Context.FileSplitInfo(file.getPath(), 0, + file.getLen(), hosts, fileMetaInfo)); } } else { // if it requires a compute task @@ -643,8 +683,8 @@ void createSplit(long offset, long length, FileMetaInfo fileMetaInfo) throws IOE hostList.toArray(hosts); } synchronized (context.splits) { - context.splits.add(new OrcSplit(file.getPath(), offset, length, - hosts, fileMetaInfo)); + context.splits.add(new Context.FileSplitInfo(file.getPath(), offset, + length, hosts, fileMetaInfo)); } } @@ -851,35 +891,45 @@ private Object getMin(ColumnStatistics index) { } } + static List generateSplitsInfo(Configuration conf) + throws IOException { + // use threads to resolve directories into splits + Context context = new Context(conf); + for(Path dir: getInputPaths(conf)) { + FileSystem fs = dir.getFileSystem(conf); + context.schedule(new FileGenerator(context, fs, dir)); + } + context.waitForTasks(); + // deal with exceptions + if (!context.errors.isEmpty()) { + List errors = + new ArrayList(context.errors.size()); + for(Throwable th: context.errors) { + if (th instanceof IOException) { + errors.add((IOException) th); + } else { + throw new RuntimeException("serious problem", th); + } + } + throw new InvalidInputException(errors); + } + if (context.cacheStripeDetails) { + LOG.info("FooterCacheHitRatio: " + context.cacheHitCounter.get() + "/" + + context.numFilesCounter.get()); + } + return context.splits; + } @Override public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { - // use threads to resolve directories into splits perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ORC_GET_SPLITS); - Context context = new Context(job); - for(Path dir: getInputPaths(job)) { - FileSystem fs = dir.getFileSystem(job); - context.schedule(new FileGenerator(context, fs, dir)); - } - context.waitForTasks(); - // deal with exceptions - if (!context.errors.isEmpty()) { - List errors = - new ArrayList(context.errors.size()); - for(Throwable th: context.errors) { - if (th instanceof IOException) { - errors.add((IOException) th); - } else { - throw new RuntimeException("serious problem", th); - } - } - throw new InvalidInputException(errors); - } - InputSplit[] result = new InputSplit[context.splits.size()]; - context.splits.toArray(result); - if (context.cacheStripeDetails) { - LOG.info("FooterCacheHitRatio: " + context.cacheHitCounter.get() + "/" - + context.numFilesCounter.get()); + List splits = + OrcInputFormat.generateSplitsInfo(job); + InputSplit[] result = new InputSplit[splits.size()]; + for (int i=0;i{ + private static final PerfLogger perfLogger = PerfLogger.getPerfLogger(); + private static final String CLASS_NAME = ReaderImpl.class.getName(); + + @Override + public RecordReader createRecordReader( + InputSplit inputSplit, TaskAttemptContext context) + throws IOException, InterruptedException { + FileSplit fileSplit = (FileSplit) inputSplit; + Path path = fileSplit.getPath(); + FileSystem fs = path.getFileSystem(ShimLoader.getHadoopShims() + .getConfiguration(context)); + return new OrcRecordReader(OrcFile.createReader(fs, path), + ShimLoader.getHadoopShims().getConfiguration(context), + fileSplit.getStart(), fileSplit.getLength()); + } + + private static class OrcRecordReader + extends RecordReader { + private final org.apache.hadoop.hive.ql.io.orc.RecordReader reader; + private final int numColumns; + OrcStruct value; + private float progress = 0.0f; + + OrcRecordReader(Reader file, Configuration conf, + long offset, long length) throws IOException { + List types = file.getTypes(); + numColumns = (types.size() == 0) ? 0 : types.get(0).getSubtypesCount(); + value = new OrcStruct(numColumns); + this.reader = OrcInputFormat.createReaderFromFile(file, conf, offset, + length); + } + + @Override + public void close() throws IOException { + reader.close(); + } + + + @Override + public NullWritable getCurrentKey() throws IOException, + InterruptedException { + return NullWritable.get(); + } + + + @Override + public OrcStruct getCurrentValue() throws IOException, + InterruptedException { + return value; + } + + + @Override + public float getProgress() throws IOException, InterruptedException { + return progress; + } + + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + } + + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + if (reader.hasNext()) { + reader.next(value); + progress = reader.getProgress(); + return true; + } else { + return false; + } + } + } + + @Override + public List getSplits(JobContext jobContext) + throws IOException, InterruptedException { + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ORC_GET_SPLITS); + List splits = + OrcInputFormat.generateSplitsInfo(ShimLoader.getHadoopShims() + .getConfiguration(jobContext)); + List result = new ArrayList(); + for (OrcInputFormat.Context.FileSplitInfo split : splits) { + FileSplit newSplit = new OrcNewSplit(split.getPath(), + split.getStart(), split.getLength(), split.getLocations(), + split.getFileMetaInfo()); + result.add(newSplit); + } + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ORC_GET_SPLITS); + return result; + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewOutputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewOutputFormat.java new file mode 100644 index 0000000..95e95c6 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewOutputFormat.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.orc; + +import java.io.IOException; +import java.util.ArrayList; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.orc.OrcSerde.OrcSerdeRow; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; + +/** An OutputFormat that writes ORC files. */ +public class OrcNewOutputFormat extends + FileOutputFormat { + + private static class OrcRecordWriter + extends RecordWriter { + private Writer writer = null; + private final Path path; + private final OrcFile.WriterOptions options; + OrcRecordWriter(Path path, OrcFile.WriterOptions options) { + this.path = path; + this.options = options; + } + @Override + public void write(NullWritable key, OrcSerdeRow row) + throws IOException, InterruptedException { + if (writer == null) { + options.inspector(row.getInspector()); + writer = OrcFile.createWriter(path, options); + } + writer.addRow(row.getRow()); + } + + @Override + public void close(TaskAttemptContext context) + throws IOException, InterruptedException { + if (writer == null) { + // a row with no columns + ObjectInspector inspector = ObjectInspectorFactory. + getStandardStructObjectInspector(new ArrayList(), + new ArrayList()); + options.inspector(inspector); + writer = OrcFile.createWriter(path, options); + } + writer.close(); + } + } + + @Override + public RecordWriter getRecordWriter(TaskAttemptContext context) + throws IOException, InterruptedException { + Path file = getDefaultWorkFile(context, ""); + return new + OrcRecordWriter(file, OrcFile.writerOptions( + ShimLoader.getHadoopShims().getConfiguration(context))); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java new file mode 100644 index 0000000..3a5ba1b --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.orc; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.orc.Reader.FileMetaInfo; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; + +/** + * OrcFileSplit. Holds file meta info + * + */ +public class OrcNewSplit extends FileSplit { + private Reader.FileMetaInfo fileMetaInfo; + private boolean hasFooter; + + protected OrcNewSplit(){ + //The FileSplit() constructor in hadoop 0.20 and 1.x is package private so can't use it. + //This constructor is used to create the object and then call readFields() + // so just pass nulls to this super constructor. + super(null, 0, 0, (String[])null); + } + + public OrcNewSplit(Path path, long offset, long length, String[] hosts, + FileMetaInfo fileMetaInfo) { + super(path, offset, length, hosts); + this.fileMetaInfo = fileMetaInfo; + hasFooter = this.fileMetaInfo != null; + } + + @Override + public void write(DataOutput out) throws IOException { + //serialize path, offset, length using FileSplit + super.write(out); + + // Whether footer information follows. + out.writeBoolean(hasFooter); + + if (hasFooter) { + // serialize FileMetaInfo fields + Text.writeString(out, fileMetaInfo.compressionType); + WritableUtils.writeVInt(out, fileMetaInfo.bufferSize); + WritableUtils.writeVInt(out, fileMetaInfo.metadataSize); + + // serialize FileMetaInfo field footer + ByteBuffer footerBuff = fileMetaInfo.footerBuffer; + footerBuff.reset(); + + // write length of buffer + WritableUtils.writeVInt(out, footerBuff.limit() - footerBuff.position()); + out.write(footerBuff.array(), footerBuff.position(), + footerBuff.limit() - footerBuff.position()); + } + } + + @Override + public void readFields(DataInput in) throws IOException { + //deserialize path, offset, length using FileSplit + super.readFields(in); + + hasFooter = in.readBoolean(); + + if (hasFooter) { + // deserialize FileMetaInfo fields + String compressionType = Text.readString(in); + int bufferSize = WritableUtils.readVInt(in); + int metadataSize = WritableUtils.readVInt(in); + + // deserialize FileMetaInfo field footer + int footerBuffSize = WritableUtils.readVInt(in); + ByteBuffer footerBuff = ByteBuffer.allocate(footerBuffSize); + in.readFully(footerBuff.array(), 0, footerBuffSize); + + fileMetaInfo = new FileMetaInfo(compressionType, bufferSize, metadataSize, footerBuff); + } + } + + public FileMetaInfo getFileMetaInfo(){ + return fileMetaInfo; + } + + public boolean hasFooter() { + return hasFooter; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java index b81ca46..226a106 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java @@ -43,7 +43,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo; import org.apache.hadoop.io.Writable; -final class OrcStruct implements Writable { +final public class OrcStruct implements Writable { private Object[] fields; @@ -461,7 +461,7 @@ public boolean equals(Object o) { } } - static ObjectInspector createObjectInspector(TypeInfo info) { + static public ObjectInspector createObjectInspector(TypeInfo info) { switch (info.getCategory()) { case PRIMITIVE: switch (((PrimitiveTypeInfo) info).getPrimitiveCategory()) { diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index 93b50c6..7552b2c 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -428,7 +428,7 @@ public void testAddSplit() throws Exception { new OrcInputFormat.SplitGenerator(context, fs, fs.getFileStatus(new Path("/a/file")), null); splitter.createSplit(0, 200, null); - FileSplit result = context.getResult(-1); + OrcInputFormat.Context.FileSplitInfo result = context.getResult(-1); assertEquals(0, result.getStart()); assertEquals(200, result.getLength()); assertEquals("/a/file", result.getPath().toString()); @@ -477,7 +477,7 @@ public void testSplitGenerator() throws Exception { } throw new IOException("Errors during splitting"); } - FileSplit result = context.getResult(0); + OrcInputFormat.Context.FileSplitInfo result = context.getResult(0); assertEquals(3, result.getStart()); assertEquals(497, result.getLength()); result = context.getResult(1); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewInputOutputFormat.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewInputOutputFormat.java new file mode 100644 index 0000000..1a87c0a --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewInputOutputFormat.java @@ -0,0 +1,435 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.orc; + +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertTrue; +import static junit.framework.Assert.assertFalse; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.UUID; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.orc.OrcSerde.OrcSerdeRow; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hive.common.util.HiveTestUtils; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +public class TestNewInputOutputFormat { + + Path workDir = new Path(System.getProperty("test.tmp.dir", + "target" + File.separator + "test" + File.separator + "tmp")); + + Configuration conf; + FileSystem localFs; + + @Before + public void setup() throws Exception { + conf = new Configuration(); + conf.set("mapred.job.tracker", "local"); + conf.set("fs.default.name", "local"); + localFs = FileSystem.get(conf); + } + + @Rule + public TestName testCaseName = new TestName(); + + public static class OrcTestMapper1 extends + Mapper { + @Override + public void map(Object key, Writable value, Context context) + throws IOException, InterruptedException { + context.write(null, new Text(value.toString())); + } + } + + @Test + // Test regular inputformat + public void testNewInputFormat() throws Exception { + Job job = new Job(conf, "orc test"); + job.setInputFormatClass(OrcNewInputFormat.class); + job.setJarByClass(TestNewInputOutputFormat.class); + job.setMapperClass(OrcTestMapper1.class); + job.setNumReduceTasks(0); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + FileInputFormat.addInputPath(job, + new Path(HiveTestUtils.getFileFromClasspath("orc-file-11-format.orc"))); + Path outputPath = new Path(workDir, + "TestOrcFile." + testCaseName.getMethodName() + ".txt"); + localFs.delete(outputPath, true); + FileOutputFormat.setOutputPath(job, outputPath); + boolean result = job.waitForCompletion(true); + assertTrue(result); + Path outputFilePath = new Path(outputPath, "part-m-00000"); + + assertTrue(localFs.exists(outputFilePath)); + BufferedReader reader = new BufferedReader( + new InputStreamReader(localFs.open(outputFilePath))); + int count=0; + String line; + String lastLine=null; + while ((line=reader.readLine()) != null) { + count++; + lastLine = line; + } + reader.close(); + assertEquals(count, 7500); + assertEquals(lastLine, "{true, 100, 2048, 65536," + + " 9223372036854775807, 2.0, -5.0" + + ", , bye, {[{1, bye}, {2, sigh}]}, [{100000000, cat}," + + " {-100000, in}, {1234, hat}]," + + " {chani={5, chani}, mauddib={1, mauddib}}," + + " 2000-03-12 15:00:01.0, 12345678.6547457}"); + localFs.delete(outputPath, true); + } + + public static class OrcTestMapper2 extends Mapper { + private final TypeInfo typeInfo = TypeInfoUtils + .getTypeInfoFromTypeString("struct"); + private final ObjectInspector oip = TypeInfoUtils + .getStandardJavaObjectInspectorFromTypeInfo(typeInfo); + private final OrcSerde serde = new OrcSerde(); + private Writable row; + @Override + public void map(Object key, Text value, Context context) + throws IOException, InterruptedException { + String[] items = value.toString().split(","); + List struct = new ArrayList(2); + struct.add(0, Integer.parseInt(items[0])); + struct.add(1, items[1]); + row = serde.serialize(struct, oip); + context.write(null, row); + } + } + + @Test + //Test regular outputformat + public void testNewOutputFormat() throws Exception { + int rownum=1000; + + Path inputPath = new Path(workDir, "TestOrcFile." + + testCaseName.getMethodName() + ".txt"); + Path outputPath = new Path(workDir, "TestOrcFile." + + testCaseName.getMethodName() + ".orc"); + localFs.delete(outputPath, true); + PrintWriter pw = new PrintWriter( + new OutputStreamWriter(localFs.create(inputPath))); + Random r = new Random(1000L); + boolean firstRow = true; + int firstIntValue = 0; + String firstStringValue = null; + for (int i=0;i { + @Override + public void map(Object key, Text value, Context context) + throws IOException, InterruptedException { + String items[] = value.toString().split("\\s+"); + context.write(new IntWritable(items.length), value); + } + } + + public static class OrcTestReducer3 extends + Reducer { + final static TypeInfo typeInfo = + TypeInfoUtils.getTypeInfoFromTypeString( + "struct>," + + "wordcounts:map>"); + private final ObjectInspector oip = + TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(typeInfo); + private final OrcSerde serde = new OrcSerde(); + private Writable row; + + @Override + public void reduce(IntWritable key, Iterable values, Context context) + throws IOException, InterruptedException { + List lastwords = new ArrayList(); + Map wordCounts = new HashMap(); + int count = 0; + for (Text val : values) { + String[] items = val.toString().toLowerCase().split("\\s+"); + lastwords.add(items[items.length-1]); + for (String item : items) { + if (wordCounts.containsKey(item)) { + wordCounts.put(item, wordCounts.get(item)+1); + } else { + wordCounts.put(item, 1); + } + } + count++; + } + List struct = new ArrayList(4); + struct.add(0, key.get()); + struct.add(1, count); + List> lastWordInfoList = new ArrayList>(); + for (String word : lastwords) { + List info = new ArrayList(2); + info.add(0, word); + info.add(1, word.length()); + lastWordInfoList.add(info); + } + struct.add(2, lastWordInfoList); + struct.add(3, wordCounts); + row = serde.serialize(struct, oip); + context.write(NullWritable.get(), row); + } + } + + @SuppressWarnings("unchecked") + @Test + //Test outputformat with complex data type, and with reduce + public void testNewOutputFormatComplex() throws Exception { + Path inputPath = new Path(workDir, "TestOrcFile." + + testCaseName.getMethodName() + ".txt"); + Path outputPath = new Path(workDir, "TestOrcFile." + + testCaseName.getMethodName() + ".orc"); + localFs.delete(outputPath, true); + PrintWriter pw = new PrintWriter( + new OutputStreamWriter(localFs.create(inputPath))); + pw.println("I have eaten"); + pw.println("the plums"); + pw.println("that were in"); + pw.println("the icebox"); + pw.println("and which"); + pw.println("you were probably"); + pw.println("saving"); + pw.println("for breakfast"); + pw.println("Forgive me"); + pw.println("they were delicious"); + pw.println("so sweet"); + pw.println("and so cold"); + pw.close(); + + Job job = new Job(conf, "orc test"); + job.setOutputFormatClass(OrcNewOutputFormat.class); + job.setJarByClass(TestNewInputOutputFormat.class); + job.setMapperClass(OrcTestMapper3.class); + job.setReducerClass(OrcTestReducer3.class); + job.setMapOutputKeyClass(IntWritable.class); + job.setMapOutputValueClass(Text.class); + job.setOutputKeyClass(NullWritable.class); + job.setOutputValueClass(OrcSerdeRow.class); + FileInputFormat.addInputPath(job, inputPath); + FileOutputFormat.setOutputPath(job, outputPath); + boolean result = job.waitForCompletion(true); + assertTrue(result); + + Path outputFilePath = new Path(outputPath, "part-r-00000"); + Reader reader = OrcFile.createReader(localFs, outputFilePath); + + RecordReader rows = reader.rows(null); + ObjectInspector orcOi = reader.getObjectInspector(); + ObjectInspector stoi = TypeInfoUtils + .getStandardJavaObjectInspectorFromTypeInfo(OrcTestReducer3.typeInfo); + ObjectInspectorConverters.Converter converter = ObjectInspectorConverters + .getConverter(orcOi, stoi); + + Object row = rows.next(null); + List converted = (List)converter.convert(row); + assertEquals(converted.get(0), 1); + assertEquals(converted.get(1), 1); + List list = (List)converted.get(2); + assertEquals(list.size(), 1); + assertEquals(((List)list.get(0)).get(0), "saving"); + assertEquals(((List)list.get(0)).get(1), 6); + Map map = (Map)converted.get(3); + assertEquals(map.size(), 1); + assertEquals(map.get("saving"), new Integer(1)); + + row = rows.next(null); + converted = (List)converter.convert(row); + assertEquals(converted.get(0), 2); + assertEquals(converted.get(1), 6); + list = (List)converted.get(2); + assertEquals(list.size(), 6); + assertEquals(((List)list.get(0)).get(0), "plums"); + assertEquals(((List)list.get(0)).get(1), 5); + map = (Map)converted.get(3); + assertEquals(map.size(), 11); + assertEquals(map.get("the"), new Integer(2)); + + row = rows.next(null); + converted = (List)converter.convert(row); + assertEquals(converted.get(0), 3); + assertEquals(converted.get(1), 5); + list = (List)converted.get(2); + assertEquals(list.size(), 5); + assertEquals(((List)list.get(0)).get(0), "eaten"); + assertEquals(((List)list.get(0)).get(1), 5); + map = (Map)converted.get(3); + assertEquals(map.size(), 13); + assertEquals(map.get("were"), new Integer(3)); + + assertFalse(rows.hasNext()); + + localFs.delete(outputPath, true); + } + + @Test + // Test inputformat with column prune + public void testNewInputFormatPruning() throws Exception { + conf.set("hive.io.file.read.all.columns", "false"); + conf.set("hive.io.file.readcolumn.ids", "1,3"); + Job job = new Job(conf, "orc test"); + job.setInputFormatClass(OrcNewInputFormat.class); + job.setJarByClass(TestNewInputOutputFormat.class); + job.setMapperClass(OrcTestMapper1.class); + job.setNumReduceTasks(0); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + FileInputFormat.addInputPath(job, new Path(HiveTestUtils + .getFileFromClasspath("orc-file-11-format.orc"))); + Path outputPath = new Path(workDir, "TestOrcFile." + + testCaseName.getMethodName() + ".txt"); + localFs.delete(outputPath, true); + FileOutputFormat.setOutputPath(job, outputPath); + boolean result = job.waitForCompletion(true); + assertTrue(result); + Path outputFilePath = new Path(outputPath, "part-m-00000"); + + BufferedReader reader = new BufferedReader( + new InputStreamReader(localFs.open(outputFilePath))); + String line=reader.readLine(); + + assertEquals(line, "{null, 1, null, 65536, null, null, null, " + + "null, null, null, null, null, null, null}"); + + localFs.delete(outputPath, true); + } +} diff --git serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java index 108badc..cfd98f2 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java +++ serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java @@ -31,9 +31,9 @@ public final class ColumnProjectionUtils { public static final String READ_COLUMN_IDS_CONF_STR = "hive.io.file.readcolumn.ids"; + public static final String READ_ALL_COLUMNS = "hive.io.file.read.all.columns"; public static final String READ_COLUMN_NAMES_CONF_STR = "hive.io.file.readcolumn.names"; private static final String READ_COLUMN_IDS_CONF_STR_DEFAULT = ""; - private static final String READ_ALL_COLUMNS = "hive.io.file.read.all.columns"; private static final boolean READ_ALL_COLUMNS_DEFAULT = true; /** diff --git shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java index ec1f18e..4f5d4fa 100644 --- shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java +++ shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java @@ -773,4 +773,8 @@ public FileSystem createProxyFileSystem(FileSystem fs, URI uri) { ret.put("MAPREDTASKCLEANUPNEEDED", "mapreduce.job.committer.task.cleanup.needed"); return ret; } + @Override + public Configuration getConfiguration(org.apache.hadoop.mapreduce.JobContext context) { + return context.getConfiguration(); + } } diff --git shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java index d0ff7d4..be57716 100644 --- shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java +++ shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java @@ -410,4 +410,9 @@ public FileSystem createProxyFileSystem(FileSystem fs, URI uri) { ret.put("MAPREDTASKCLEANUPNEEDED", "mapreduce.job.committer.task.cleanup.needed"); return ret; } + + @Override + public Configuration getConfiguration(org.apache.hadoop.mapreduce.JobContext context) { + return context.getConfiguration(); + } } diff --git shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java index 54c38ee..3d778df 100644 --- shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java +++ shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java @@ -557,4 +557,9 @@ public FileSystem createProxyFileSystem(FileSystem fs, URI uri) { ret.put("MAPREDTASKCLEANUPNEEDED", "mapreduce.job.committer.task.cleanup.needed"); return ret; } + + @Override + public Configuration getConfiguration(org.apache.hadoop.mapreduce.JobContext context) { + return context.getConfiguration(); + } } diff --git shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java index 2b3c6c1..9e9a60d 100644 --- shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java +++ shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java @@ -520,4 +520,10 @@ RecordReader getRecordReader(JobConf job, InputSplitShim split, Reporter reporte public FileSystem createProxyFileSystem(FileSystem fs, URI uri); public Map getHadoopConfNames(); + + + /** + * Get configuration from JobContext + */ + public Configuration getConfiguration(JobContext context); }