diff --git a/orc/src/java/org/apache/orc/DataReader.java b/orc/src/java/org/apache/orc/DataReader.java index 3a5f854..b70f26b 100644 --- a/orc/src/java/org/apache/orc/DataReader.java +++ b/orc/src/java/org/apache/orc/DataReader.java @@ -18,20 +18,18 @@ package org.apache.orc; +import java.io.Closeable; import java.io.IOException; import java.nio.ByteBuffer; import org.apache.hadoop.hive.common.io.DiskRangeList; /** An abstract data reader that IO formats can use to read bytes from underlying storage. */ -public interface DataReader { +public interface DataReader extends Closeable { /** Opens the DataReader, making it ready to use. */ void open() throws IOException; - /** Closes the DataReader. */ - void close() throws IOException; - /** Reads the data. * * Note that for the cases such as zero-copy read, caller must release the disk ranges diff --git a/orc/src/java/org/apache/orc/DataReaderFactory.java b/orc/src/java/org/apache/orc/DataReaderFactory.java new file mode 100644 index 0000000..ec3a0e9 --- /dev/null +++ b/orc/src/java/org/apache/orc/DataReaderFactory.java @@ -0,0 +1,9 @@ +package org.apache.orc; + +import org.apache.orc.impl.DataReaderProperties; + +public interface DataReaderFactory { + + DataReader create(DataReaderProperties properties); + +} diff --git a/orc/src/java/org/apache/orc/MetadataReaderFactory.java b/orc/src/java/org/apache/orc/MetadataReaderFactory.java new file mode 100644 index 0000000..64629da --- /dev/null +++ b/orc/src/java/org/apache/orc/MetadataReaderFactory.java @@ -0,0 +1,12 @@ +package org.apache.orc; + +import org.apache.orc.impl.MetadataReader; +import org.apache.orc.impl.MetadataReaderProperties; + +import java.io.IOException; + +public interface MetadataReaderFactory { + + MetadataReader create(MetadataReaderProperties properties) throws IOException; + +} diff --git a/orc/src/java/org/apache/orc/impl/DataReaderProperties.java b/orc/src/java/org/apache/orc/impl/DataReaderProperties.java new file mode 100644 index 0000000..49f47d6 --- /dev/null +++ b/orc/src/java/org/apache/orc/impl/DataReaderProperties.java @@ -0,0 +1,84 @@ +package org.apache.orc.impl; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.orc.CompressionCodec; + +import javax.annotation.Nullable; + +public final class DataReaderProperties { + + private final FileSystem fileSystem; + private final Path path; + private final CompressionCodec codec; + private final boolean zeroCopy; + + private DataReaderProperties(Builder builder) { + this.fileSystem = builder.fileSystem; + this.path = builder.path; + this.codec = builder.codec; + this.zeroCopy = builder.zeroCopy; + } + + public FileSystem getFileSystem() { + return fileSystem; + } + + public Path getPath() { + return path; + } + + @Nullable + public CompressionCodec getCodec() { + return codec; + } + + public boolean getZeroCopy() { + return zeroCopy; + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + + private FileSystem fileSystem; + private Path path; + private CompressionCodec codec; + private boolean zeroCopy; + + private Builder() { + + } + + public Builder withFileSystem(FileSystem fileSystem) { + this.fileSystem = fileSystem; + return this; + } + + public Builder withPath(Path path) { + this.path = path; + return this; + } + + public Builder withCodec(CompressionCodec codec) { + this.codec = codec; + return this; + } + + public Builder withZeroCopy(boolean zeroCopy) { + this.zeroCopy = zeroCopy; + return this; + } + + public DataReaderProperties build() { + Preconditions.checkNotNull(fileSystem); + Preconditions.checkNotNull(path); + + return new DataReaderProperties(this); + } + + } +} diff --git a/orc/src/java/org/apache/orc/impl/DefaultMetadataReaderFactory.java b/orc/src/java/org/apache/orc/impl/DefaultMetadataReaderFactory.java new file mode 100644 index 0000000..fc0d141 --- /dev/null +++ b/orc/src/java/org/apache/orc/impl/DefaultMetadataReaderFactory.java @@ -0,0 +1,14 @@ +package org.apache.orc.impl; + +import org.apache.orc.MetadataReaderFactory; + +import java.io.IOException; + +public final class DefaultMetadataReaderFactory implements MetadataReaderFactory { + + @Override + public MetadataReader create(MetadataReaderProperties properties) throws IOException { + return new MetadataReaderImpl(properties); + } + +} diff --git a/orc/src/java/org/apache/orc/impl/MetadataReader.java b/orc/src/java/org/apache/orc/impl/MetadataReader.java index 670a81a..500239d 100644 --- a/orc/src/java/org/apache/orc/impl/MetadataReader.java +++ b/orc/src/java/org/apache/orc/impl/MetadataReader.java @@ -17,18 +17,17 @@ */ package org.apache.orc.impl; +import java.io.Closeable; import java.io.IOException; import org.apache.orc.OrcProto; import org.apache.orc.StripeInformation; -public interface MetadataReader { +public interface MetadataReader extends Closeable { OrcIndex readRowIndex(StripeInformation stripe, OrcProto.StripeFooter footer, boolean[] included, OrcProto.RowIndex[] indexes, boolean[] sargColumns, OrcProto.BloomFilterIndex[] bloomFilterIndices) throws IOException; OrcProto.StripeFooter readStripeFooter(StripeInformation stripe) throws IOException; - - void close() throws IOException; } diff --git a/orc/src/java/org/apache/orc/impl/MetadataReaderImpl.java b/orc/src/java/org/apache/orc/impl/MetadataReaderImpl.java index d0ded52..c3ea74f 100644 --- a/orc/src/java/org/apache/orc/impl/MetadataReaderImpl.java +++ b/orc/src/java/org/apache/orc/impl/MetadataReaderImpl.java @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; import java.util.List; +import com.google.common.base.Preconditions; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -37,17 +38,11 @@ private final int bufferSize; private final int typeCount; - public MetadataReaderImpl(FileSystem fileSystem, Path path, - CompressionCodec codec, int bufferSize, int typeCount) throws IOException { - this(fileSystem.open(path), codec, bufferSize, typeCount); - } - - public MetadataReaderImpl(FSDataInputStream file, - CompressionCodec codec, int bufferSize, int typeCount) { - this.file = file; - this.codec = codec; - this.bufferSize = bufferSize; - this.typeCount = typeCount; + MetadataReaderImpl(MetadataReaderProperties properties) throws IOException { + this.file = properties.getFileSystem().open(properties.getPath()); + this.codec = properties.getCodec(); + this.bufferSize = properties.getBufferSize(); + this.typeCount = properties.getTypeCount(); } @Override diff --git a/orc/src/java/org/apache/orc/impl/MetadataReaderProperties.java b/orc/src/java/org/apache/orc/impl/MetadataReaderProperties.java new file mode 100644 index 0000000..321931c --- /dev/null +++ b/orc/src/java/org/apache/orc/impl/MetadataReaderProperties.java @@ -0,0 +1,96 @@ +package org.apache.orc.impl; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.orc.CompressionCodec; + +import javax.annotation.Nullable; + +public final class MetadataReaderProperties { + + private final FileSystem fileSystem; + private final Path path; + private final CompressionCodec codec; + private final int bufferSize; + private final int typeCount; + + private MetadataReaderProperties(Builder builder) { + this.fileSystem = builder.fileSystem; + this.path = builder.path; + this.codec = builder.codec; + this.bufferSize = builder.bufferSize; + this.typeCount = builder.typeCount; + } + + public FileSystem getFileSystem() { + return fileSystem; + } + + public Path getPath() { + return path; + } + + @Nullable + public CompressionCodec getCodec() { + return codec; + } + + public int getBufferSize() { + return bufferSize; + } + + public int getTypeCount() { + return typeCount; + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + + private FileSystem fileSystem; + private Path path; + private CompressionCodec codec; + private int bufferSize; + private int typeCount; + + private Builder() { + + } + + public Builder withFileSystem(FileSystem fileSystem) { + this.fileSystem = fileSystem; + return this; + } + + public Builder withPath(Path path) { + this.path = path; + return this; + } + + public Builder withCodec(CompressionCodec codec) { + this.codec = codec; + return this; + } + + public Builder withBufferSize(int bufferSize) { + this.bufferSize = bufferSize; + return this; + } + + public Builder withTypeCount(int typeCount) { + this.typeCount = typeCount; + return this; + } + + public MetadataReaderProperties build() { + Preconditions.checkNotNull(fileSystem); + Preconditions.checkNotNull(path); + + return new MetadataReaderProperties(this); + } + + } +} diff --git a/orc/src/test/org/apache/orc/impl/TestDataReaderProperties.java b/orc/src/test/org/apache/orc/impl/TestDataReaderProperties.java new file mode 100644 index 0000000..9ec08f3 --- /dev/null +++ b/orc/src/test/org/apache/orc/impl/TestDataReaderProperties.java @@ -0,0 +1,69 @@ +package org.apache.orc.impl; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.orc.CompressionCodec; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.mock; + +public class TestDataReaderProperties { + + private FileSystem mockedFileSystem = mock(FileSystem.class); + private Path mockedPath = mock(Path.class); + private CompressionCodec mockedCodec = mock(CompressionCodec.class); + private boolean mockedZeroCopy = false; + + @Test + public void testCompleteBuild() { + DataReaderProperties properties = DataReaderProperties.builder() + .withFileSystem(mockedFileSystem) + .withPath(mockedPath) + .withCodec(mockedCodec) + .withZeroCopy(mockedZeroCopy) + .build(); + assertEquals(mockedFileSystem, properties.getFileSystem()); + assertEquals(mockedPath, properties.getPath()); + assertEquals(mockedCodec, properties.getCodec()); + assertEquals(mockedZeroCopy, properties.getZeroCopy()); + } + + @Test + public void testMissingNonRequiredArgs() { + DataReaderProperties properties = DataReaderProperties.builder() + .withFileSystem(mockedFileSystem) + .withPath(mockedPath) + .build(); + assertEquals(mockedFileSystem, properties.getFileSystem()); + assertEquals(mockedPath, properties.getPath()); + assertNull(properties.getCodec()); + assertFalse(properties.getZeroCopy()); + } + + @Test(expected = java.lang.NullPointerException.class) + public void testEmptyBuild() { + DataReaderProperties.builder().build(); + } + + @Test(expected = java.lang.NullPointerException.class) + public void testMissingPath() { + DataReaderProperties.builder() + .withFileSystem(mockedFileSystem) + .withCodec(mockedCodec) + .withZeroCopy(mockedZeroCopy) + .build(); + } + + @Test(expected = java.lang.NullPointerException.class) + public void testMissingFileSystem() { + DataReaderProperties.builder() + .withPath(mockedPath) + .withCodec(mockedCodec) + .withZeroCopy(mockedZeroCopy) + .build(); + } + +} diff --git a/orc/src/test/org/apache/orc/impl/TestMetadataReaderProperties.java b/orc/src/test/org/apache/orc/impl/TestMetadataReaderProperties.java new file mode 100644 index 0000000..12e8eb4 --- /dev/null +++ b/orc/src/test/org/apache/orc/impl/TestMetadataReaderProperties.java @@ -0,0 +1,72 @@ +package org.apache.orc.impl; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.orc.CompressionCodec; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.mock; + +public class TestMetadataReaderProperties { + + private FileSystem mockedFileSystem = mock(FileSystem.class); + private Path mockedPath = mock(Path.class); + private CompressionCodec mockedCodec = mock(CompressionCodec.class); + private int mockedBufferSize = 0; + private int mockedTypeCount = 0; + + @Test + public void testCompleteBuild() { + MetadataReaderProperties properties = MetadataReaderProperties.builder() + .withFileSystem(mockedFileSystem) + .withPath(mockedPath) + .withCodec(mockedCodec) + .withBufferSize(mockedBufferSize) + .withTypeCount(mockedTypeCount) + .build(); + assertEquals(mockedFileSystem, properties.getFileSystem()); + assertEquals(mockedPath, properties.getPath()); + assertEquals(mockedCodec, properties.getCodec()); + assertEquals(mockedBufferSize, properties.getBufferSize()); + assertEquals(mockedTypeCount, properties.getTypeCount()); + } + + @Test + public void testMissingNonRequiredArgs() { + MetadataReaderProperties properties = MetadataReaderProperties.builder() + .withFileSystem(mockedFileSystem) + .withPath(mockedPath) + .build(); + assertEquals(mockedFileSystem, properties.getFileSystem()); + assertEquals(mockedPath, properties.getPath()); + assertNull(properties.getCodec()); + assertEquals(0, properties.getBufferSize()); + assertEquals(0, properties.getTypeCount()); + } + + @Test(expected = java.lang.NullPointerException.class) + public void testEmptyBuild() { + MetadataReaderProperties.builder().build(); + } + + @Test(expected = java.lang.NullPointerException.class) + public void testMissingPath() { + MetadataReaderProperties.builder() + .withFileSystem(mockedFileSystem) + .withCodec(mockedCodec) + .withBufferSize(mockedBufferSize) + .build(); + } + + @Test(expected = java.lang.NullPointerException.class) + public void testMissingFileSystem() { + MetadataReaderProperties.builder() + .withPath(mockedPath) + .withCodec(mockedCodec) + .withBufferSize(mockedBufferSize) + .build(); + } + +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DefaultDataReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DefaultDataReaderFactory.java new file mode 100644 index 0000000..de3471c --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DefaultDataReaderFactory.java @@ -0,0 +1,14 @@ +package org.apache.hadoop.hive.ql.io.orc; + +import org.apache.orc.DataReader; +import org.apache.orc.DataReaderFactory; +import org.apache.orc.impl.DataReaderProperties; + +public final class DefaultDataReaderFactory implements DataReaderFactory { + + @Override + public DataReader create(DataReaderProperties properties) { + return RecordReaderUtils.createDefaultDataReader(properties); + } + +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java index a031a92..822ef14 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java @@ -26,6 +26,8 @@ import java.util.List; import java.util.Set; +import org.apache.orc.DataReaderFactory; +import org.apache.orc.MetadataReaderFactory; import org.apache.orc.impl.BufferChunk; import org.apache.orc.ColumnStatistics; import org.apache.orc.impl.ColumnStatisticsImpl; @@ -33,9 +35,11 @@ import org.apache.orc.DataReader; import org.apache.orc.FileMetaInfo; import org.apache.orc.FileMetadata; +import org.apache.orc.impl.DataReaderProperties; +import org.apache.orc.impl.DefaultMetadataReaderFactory; import org.apache.orc.impl.InStream; import org.apache.orc.impl.MetadataReader; -import org.apache.orc.impl.MetadataReaderImpl; +import org.apache.orc.impl.MetadataReaderProperties; import org.apache.orc.StripeInformation; import org.apache.orc.StripeStatistics; import org.slf4j.Logger; @@ -76,7 +80,8 @@ private final List stripes; protected final int rowIndexStride; private final long contentLength, numberOfRows; - + private final MetadataReaderFactory metadataReaderFactory = new DefaultMetadataReaderFactory(); + private final DataReaderFactory dataReaderFactory = new DefaultDataReaderFactory(); private final ObjectInspector inspector; private long deserializedSize = -1; @@ -667,8 +672,20 @@ public RecordReader rowsOptions(Options options) throws IOException { Arrays.fill(include, true); options.include(include); } - return new RecordReaderImpl(this.getStripes(), fileSystem, path, - options, types, codec, bufferSize, rowIndexStride, conf); + + return RecordReaderImpl.builder() + .withMetadataReaderFactory(metadataReaderFactory) + .withDataReaderFactory(dataReaderFactory) + .withStripes(this.getStripes()) + .withFileSystem(fileSystem) + .withPath(path) + .withOptions(options) + .withTypes(types) + .withCodec(codec) + .withBufferSize(bufferSize) + .withStrideRate(rowIndexStride) + .withConf(conf) + .build(); } @@ -852,7 +869,13 @@ private int getLastIdx() { @Override public MetadataReader metadata() throws IOException { - return new MetadataReaderImpl(fileSystem, path, codec, bufferSize, types.size()); + return metadataReaderFactory.create(MetadataReaderProperties.builder() + .withBufferSize(bufferSize) + .withCodec(codec) + .withFileSystem(fileSystem) + .withPath(path) + .withTypeCount(types.size()) + .build()); } @Override @@ -867,7 +890,12 @@ public int getMetadataSize() { @Override public DataReader createDefaultDataReader(boolean useZeroCopy) { - return RecordReaderUtils.createDefaultDataReader(fileSystem, path, useZeroCopy, codec); + return dataReaderFactory.create(DataReaderProperties.builder() + .withFileSystem(fileSystem) + .withPath(path) + .withCodec(codec) + .withZeroCopy(useZeroCopy) + .build()); } @Override diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java index 3975409..9cfcc0e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java @@ -27,8 +27,11 @@ import java.util.List; import java.util.Map; -import org.apache.commons.lang3.exception.ExceptionUtils; +import com.google.common.base.Preconditions; +import com.google.common.io.Closer; import org.apache.orc.BooleanColumnStatistics; +import org.apache.orc.DataReaderFactory; +import org.apache.orc.MetadataReaderFactory; import org.apache.orc.OrcUtils; import org.apache.orc.impl.BufferChunk; import org.apache.orc.ColumnStatistics; @@ -38,11 +41,13 @@ import org.apache.orc.DateColumnStatistics; import org.apache.orc.DecimalColumnStatistics; import org.apache.orc.DoubleColumnStatistics; +import org.apache.orc.impl.DataReaderProperties; +import org.apache.orc.impl.DefaultMetadataReaderFactory; import org.apache.orc.impl.InStream; import org.apache.orc.IntegerColumnStatistics; import org.apache.orc.impl.MetadataReader; -import org.apache.orc.impl.MetadataReaderImpl; import org.apache.orc.OrcConf; +import org.apache.orc.impl.MetadataReaderProperties; import org.apache.orc.impl.OrcIndex; import org.apache.orc.impl.PositionProvider; import org.apache.orc.impl.StreamName; @@ -141,17 +146,99 @@ static int findColumns(String[] columnNames, return result; } - protected RecordReaderImpl(List stripes, - FileSystem fileSystem, - Path path, - Reader.Options options, - List types, - CompressionCodec codec, - int bufferSize, - long strideRate, - Configuration conf - ) throws IOException { + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private Reader.Options options; + private CompressionCodec codec; + private List types; + private List stripes; + private int bufferSize; + private FileSystem fileSystem; + private Path path; + private Configuration conf; + private long strideRate; + private MetadataReaderFactory metadataReaderFactory = new DefaultMetadataReaderFactory(); + private DataReaderFactory dataReaderFactory = new DefaultDataReaderFactory(); + + private Builder() { + } + + public Builder withOptions(Reader.Options options) { + this.options = options; + return this; + } + + public Builder withCodec(CompressionCodec codec) { + this.codec = codec; + return this; + } + + public Builder withTypes(List types) { + this.types = types; + return this; + } + + public Builder withStripes(List stripes) { + this.stripes = stripes; + return this; + } + + public Builder withBufferSize(int bufferSize) { + this.bufferSize = bufferSize; + return this; + } + + public Builder withFileSystem(FileSystem fileSystem) { + this.fileSystem = fileSystem; + return this; + } + + public Builder withPath(Path path) { + this.path = path; + return this; + } + + public Builder withConf(Configuration conf) { + this.conf = conf; + return this; + } + + public Builder withStrideRate(long strideRate) { + this.strideRate = strideRate; + return this; + } + + public Builder withMetadataReaderFactory(MetadataReaderFactory metadataReaderFactory) { + this.metadataReaderFactory = metadataReaderFactory; + return this; + } + + public Builder withDataReaderFactory(DataReaderFactory dataReaderFactory) { + this.dataReaderFactory = dataReaderFactory; + return this; + } + + public RecordReaderImpl build() throws IOException { + Preconditions.checkNotNull(metadataReaderFactory); + Preconditions.checkNotNull(dataReaderFactory); + Preconditions.checkNotNull(options); + Preconditions.checkNotNull(types); + Preconditions.checkNotNull(stripes); + Preconditions.checkNotNull(fileSystem); + Preconditions.checkNotNull(path); + Preconditions.checkNotNull(conf); + + return new RecordReaderImpl(this); + } + } + + private RecordReaderImpl(Builder builder) throws IOException { + Reader.Options options = builder.options; + this.types = builder.types; TreeReaderFactory.TreeReaderSchema treeReaderSchema; if (options.getSchema() == null) { if (LOG.isInfoEnabled()) { @@ -166,18 +253,23 @@ protected RecordReaderImpl(List stripes, List schemaTypes = OrcUtils.getOrcTypes(options.getSchema()); treeReaderSchema = SchemaEvolution.validateAndCreate(types, schemaTypes); } - this.path = path; - this.codec = codec; - this.types = types; - this.bufferSize = bufferSize; + this.path = builder.path; + this.codec = builder.codec; + this.bufferSize = builder.bufferSize; this.included = options.getInclude(); - this.conf = conf; - this.rowIndexStride = strideRate; - this.metadata = new MetadataReaderImpl(fileSystem, path, codec, bufferSize, types.size()); + this.conf = builder.conf; + this.rowIndexStride = builder.strideRate; + this.metadata = builder.metadataReaderFactory.create(MetadataReaderProperties.builder() + .withFileSystem(builder.fileSystem) + .withPath(path) + .withCodec(codec) + .withBufferSize(bufferSize) + .withTypeCount(types.size()) + .build()); SearchArgument sarg = options.getSearchArgument(); - if (sarg != null && strideRate != 0) { + if (sarg != null && builder.strideRate != 0) { sargApp = new SargApplier( - sarg, options.getColumnNames(), strideRate, types, included.length); + sarg, options.getColumnNames(), builder.strideRate, types, included.length); } else { sargApp = null; } @@ -185,7 +277,7 @@ protected RecordReaderImpl(List stripes, long skippedRows = 0; long offset = options.getOffset(); long maxOffset = options.getMaxOffset(); - for(StripeInformation stripe: stripes) { + for(StripeInformation stripe: builder.stripes) { long stripeStart = stripe.getOffset(); if (offset > stripeStart) { skippedRows += stripe.getNumberOfRows(); @@ -200,7 +292,12 @@ protected RecordReaderImpl(List stripes, zeroCopy = OrcConf.USE_ZEROCOPY.getBoolean(conf); } // TODO: we could change the ctor to pass this externally - this.dataReader = RecordReaderUtils.createDefaultDataReader(fileSystem, path, zeroCopy, codec); + this.dataReader = builder.dataReaderFactory.create(DataReaderProperties.builder() + .withFileSystem(builder.fileSystem) + .withCodec(codec) + .withPath(path) + .withZeroCopy(zeroCopy) + .build()); this.dataReader.open(); firstRow = skippedRows; @@ -1119,8 +1216,16 @@ private int computeBatchSize(long targetBatchSize) { @Override public void close() throws IOException { - clearStreams(); - dataReader.close(); + Closer closer = Closer.create(); + try { + closer.register(metadata); + closer.register(dataReader); + clearStreams(); + } catch (IOException e) { + throw closer.rethrow(e); + } finally { + closer.close(); + } } @Override diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java index 8a73948..177721d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java @@ -39,6 +39,7 @@ import org.apache.orc.impl.BufferChunk; import org.apache.orc.CompressionCodec; import org.apache.orc.DataReader; +import org.apache.orc.impl.DataReaderProperties; import org.apache.orc.impl.DirectDecompressionCodec; import org.apache.orc.OrcProto; @@ -60,12 +61,11 @@ private boolean useZeroCopy; private CompressionCodec codec; - public DefaultDataReader( - FileSystem fs, Path path, boolean useZeroCopy, CompressionCodec codec) { - this.fs = fs; - this.path = path; - this.useZeroCopy = useZeroCopy; - this.codec = codec; + private DefaultDataReader(DataReaderProperties properties) { + this.fs = properties.getFileSystem(); + this.path = properties.getPath(); + this.useZeroCopy = properties.getZeroCopy(); + this.codec = properties.getCodec(); } @Override @@ -108,9 +108,8 @@ public void releaseBuffer(ByteBuffer buffer) { } - static DataReader createDefaultDataReader( - FileSystem fs, Path path, boolean useZeroCopy, CompressionCodec codec) { - return new DefaultDataReader(fs, path, useZeroCopy, codec); + static DataReader createDefaultDataReader(DataReaderProperties properties) { + return new DefaultDataReader(properties); } public static boolean[] findPresentStreamsByColumn( diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java index 6803abd..cc7182f 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java @@ -21,11 +21,18 @@ import static junit.framework.Assert.assertEquals; import static org.hamcrest.core.Is.is; import static org.junit.Assert.*; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.io.IOException; import java.io.InputStream; import java.sql.Timestamp; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import org.apache.hadoop.conf.Configuration; @@ -46,9 +53,17 @@ import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.orc.ColumnStatistics; +import org.apache.orc.CompressionCodec; +import org.apache.orc.DataReader; +import org.apache.orc.DataReaderFactory; +import org.apache.orc.MetadataReaderFactory; +import org.apache.orc.StripeInformation; import org.apache.orc.impl.ColumnStatisticsImpl; import org.apache.orc.OrcProto; +import org.apache.orc.impl.DataReaderProperties; +import org.apache.orc.impl.MetadataReader; +import org.apache.orc.impl.MetadataReaderProperties; import org.junit.Test; import org.mockito.MockSettings; import org.mockito.Mockito; @@ -148,16 +163,16 @@ public void testMaxLengthToReader() throws Exception { footer.writeTo(buffer); ps.writeTo(buffer); buffer.write(ps.getSerializedSize()); - FileSystem fs = Mockito.mock(FileSystem.class, settings); + FileSystem fs = mock(FileSystem.class, settings); FSDataInputStream file = new FSDataInputStream(new BufferInStream(buffer.getData(), buffer.getLength())); Path p = new Path("/dir/file.orc"); - Mockito.when(fs.open(p)).thenReturn(file); + when(fs.open(p)).thenReturn(file); OrcFile.ReaderOptions options = OrcFile.readerOptions(conf); options.filesystem(fs); options.maxLength(buffer.getLength()); - Mockito.when(fs.getFileStatus(p)) + when(fs.getFileStatus(p)) .thenReturn(new FileStatus(10, false, 3, 3000, 0, p)); Reader reader = OrcFile.createReader(p, options); } @@ -165,21 +180,21 @@ public void testMaxLengthToReader() throws Exception { @Test public void testCompareToRangeInt() throws Exception { assertEquals(Location.BEFORE, - RecordReaderImpl.compareToRange(19L, 20L, 40L)); + RecordReaderImpl.compareToRange(19L, 20L, 40L)); assertEquals(Location.AFTER, - RecordReaderImpl.compareToRange(41L, 20L, 40L)); + RecordReaderImpl.compareToRange(41L, 20L, 40L)); assertEquals(Location.MIN, RecordReaderImpl.compareToRange(20L, 20L, 40L)); assertEquals(Location.MIDDLE, RecordReaderImpl.compareToRange(21L, 20L, 40L)); assertEquals(Location.MAX, - RecordReaderImpl.compareToRange(40L, 20L, 40L)); + RecordReaderImpl.compareToRange(40L, 20L, 40L)); assertEquals(Location.BEFORE, - RecordReaderImpl.compareToRange(0L, 1L, 1L)); + RecordReaderImpl.compareToRange(0L, 1L, 1L)); assertEquals(Location.MIN, - RecordReaderImpl.compareToRange(1L, 1L, 1L)); + RecordReaderImpl.compareToRange(1L, 1L, 1L)); assertEquals(Location.AFTER, - RecordReaderImpl.compareToRange(2L, 1L, 1L)); + RecordReaderImpl.compareToRange(2L, 1L, 1L)); } @Test @@ -205,43 +220,43 @@ public void testCompareToRangeString() throws Exception { @Test public void testCompareToCharNeedConvert() throws Exception { assertEquals(Location.BEFORE, - RecordReaderImpl.compareToRange("apple", "hello", "world")); + RecordReaderImpl.compareToRange("apple", "hello", "world")); assertEquals(Location.AFTER, - RecordReaderImpl.compareToRange("zombie", "hello", "world")); + RecordReaderImpl.compareToRange("zombie", "hello", "world")); assertEquals(Location.MIN, RecordReaderImpl.compareToRange("hello", "hello", "world")); assertEquals(Location.MIDDLE, RecordReaderImpl.compareToRange("pilot", "hello", "world")); assertEquals(Location.MAX, - RecordReaderImpl.compareToRange("world", "hello", "world")); + RecordReaderImpl.compareToRange("world", "hello", "world")); assertEquals(Location.BEFORE, - RecordReaderImpl.compareToRange("apple", "hello", "hello")); + RecordReaderImpl.compareToRange("apple", "hello", "hello")); assertEquals(Location.MIN, - RecordReaderImpl.compareToRange("hello", "hello", "hello")); + RecordReaderImpl.compareToRange("hello", "hello", "hello")); assertEquals(Location.AFTER, - RecordReaderImpl.compareToRange("zombie", "hello", "hello")); + RecordReaderImpl.compareToRange("zombie", "hello", "hello")); } @Test public void testGetMin() throws Exception { assertEquals(10L, RecordReaderImpl.getMin( - ColumnStatisticsImpl.deserialize(createIntStats(10L, 100L)))); + ColumnStatisticsImpl.deserialize(createIntStats(10L, 100L)))); assertEquals(10.0d, RecordReaderImpl.getMin(ColumnStatisticsImpl.deserialize( - OrcProto.ColumnStatistics.newBuilder() - .setDoubleStatistics(OrcProto.DoubleStatistics.newBuilder() - .setMinimum(10.0d).setMaximum(100.0d).build()).build()))); + OrcProto.ColumnStatistics.newBuilder() + .setDoubleStatistics(OrcProto.DoubleStatistics.newBuilder() + .setMinimum(10.0d).setMaximum(100.0d).build()).build()))); assertEquals(null, RecordReaderImpl.getMin(ColumnStatisticsImpl.deserialize( - OrcProto.ColumnStatistics.newBuilder() - .setStringStatistics(OrcProto.StringStatistics.newBuilder().build()) - .build()))); + OrcProto.ColumnStatistics.newBuilder() + .setStringStatistics(OrcProto.StringStatistics.newBuilder().build()) + .build()))); assertEquals("a", RecordReaderImpl.getMin(ColumnStatisticsImpl.deserialize( - OrcProto.ColumnStatistics.newBuilder() - .setStringStatistics(OrcProto.StringStatistics.newBuilder() - .setMinimum("a").setMaximum("b").build()).build()))); + OrcProto.ColumnStatistics.newBuilder() + .setStringStatistics(OrcProto.StringStatistics.newBuilder() + .setMinimum("a").setMaximum("b").build()).build()))); assertEquals("hello", RecordReaderImpl.getMin(ColumnStatisticsImpl - .deserialize(createStringStats("hello", "world")))); + .deserialize(createStringStats("hello", "world")))); assertEquals(HiveDecimal.create("111.1"), RecordReaderImpl.getMin(ColumnStatisticsImpl - .deserialize(createDecimalStats("111.1", "112.1")))); + .deserialize(createDecimalStats("111.1", "112.1")))); } private static OrcProto.ColumnStatistics createIntStats(Long min, @@ -262,7 +277,7 @@ public void testGetMin() throws Exception { OrcProto.BucketStatistics.Builder boolStats = OrcProto.BucketStatistics.newBuilder(); boolStats.addCount(trueCount); return OrcProto.ColumnStatistics.newBuilder().setNumberOfValues(n).setBucketStatistics( - boolStats.build()).build(); + boolStats.build()).build(); } private static OrcProto.ColumnStatistics createIntStats(int min, int max) { @@ -341,9 +356,9 @@ public void testGetMax() throws Exception { .setStringStatistics(OrcProto.StringStatistics.newBuilder() .setMinimum("a").setMaximum("b").build()).build()))); assertEquals("world", RecordReaderImpl.getMax(ColumnStatisticsImpl - .deserialize(createStringStats("hello", "world")))); + .deserialize(createStringStats("hello", "world")))); assertEquals(HiveDecimal.create("112.1"), RecordReaderImpl.getMax(ColumnStatisticsImpl - .deserialize(createDecimalStats("111.1", "112.1")))); + .deserialize(createDecimalStats("111.1", "112.1")))); } @Test @@ -365,15 +380,15 @@ public void testPredEvalWithBooleanStats() throws Exception { pred = TestSearchArgumentImpl.createPredicateLeaf( PredicateLeaf.Operator.NULL_SAFE_EQUALS, PredicateLeaf.Type.BOOLEAN, "x", false, null); assertEquals(TruthValue.NO, - RecordReaderImpl.evaluatePredicateProto(createBooleanStats(10, 10), pred, null)); + RecordReaderImpl.evaluatePredicateProto(createBooleanStats(10, 10), pred, null)); assertEquals(TruthValue.YES_NO, - RecordReaderImpl.evaluatePredicateProto(createBooleanStats(10, 0), pred, null)); + RecordReaderImpl.evaluatePredicateProto(createBooleanStats(10, 0), pred, null)); } @Test public void testPredEvalWithIntStats() throws Exception { PredicateLeaf pred = TestSearchArgumentImpl.createPredicateLeaf( - PredicateLeaf.Operator.NULL_SAFE_EQUALS, PredicateLeaf.Type.LONG, "x", 15L, null); + PredicateLeaf.Operator.NULL_SAFE_EQUALS, PredicateLeaf.Type.LONG, "x", 15L, null); assertEquals(TruthValue.YES_NO, RecordReaderImpl.evaluatePredicateProto(createIntStats(10, 100), pred, null)); @@ -402,7 +417,7 @@ public void testPredEvalWithIntStats() throws Exception { pred = TestSearchArgumentImpl.createPredicateLeaf(PredicateLeaf.Operator.NULL_SAFE_EQUALS, PredicateLeaf.Type.TIMESTAMP, "x", new Timestamp(15), null); assertEquals(TruthValue.YES_NO, - RecordReaderImpl.evaluatePredicateProto(createIntStats(10, 100), pred, null)); + RecordReaderImpl.evaluatePredicateProto(createIntStats(10, 100), pred, null)); } @Test @@ -620,7 +635,7 @@ public void testPredEvalWithTimestampStats() throws Exception { RecordReaderImpl.evaluatePredicateProto(createTimestampStats(10, 100), pred, null)); assertEquals(TruthValue.YES_NO, RecordReaderImpl.evaluatePredicateProto(createTimestampStats(10 * 24L * 60L * 60L * 1000L, - 100 * 24L * 60L * 60L * 1000L), pred, null)); + 100 * 24L * 60L * 60L * 1000L), pred, null)); pred = TestSearchArgumentImpl.createPredicateLeaf(PredicateLeaf.Operator.NULL_SAFE_EQUALS, PredicateLeaf.Type.DECIMAL, "x", new HiveDecimalWritable("15"), null); @@ -738,9 +753,9 @@ public void testBetween() throws Exception { assertEquals(TruthValue.NO_NULL, RecordReaderImpl.evaluatePredicateProto(createIntStats(0L, 5L), pred, null)); assertEquals(TruthValue.NO_NULL, - RecordReaderImpl.evaluatePredicateProto(createIntStats(30L, 40L), pred, null)); + RecordReaderImpl.evaluatePredicateProto(createIntStats(30L, 40L), pred, null)); assertEquals(TruthValue.YES_NO_NULL, - RecordReaderImpl.evaluatePredicateProto(createIntStats(5L, 15L), pred, null)); + RecordReaderImpl.evaluatePredicateProto(createIntStats(5L, 15L), pred, null)); assertEquals(TruthValue.YES_NO_NULL, RecordReaderImpl.evaluatePredicateProto(createIntStats(15L, 25L), pred, null)); assertEquals(TruthValue.YES_NO_NULL, @@ -876,10 +891,10 @@ public void testBetweenWithNullInStats() throws Exception { assertEquals(TruthValue.YES_NO_NULL, // before & min RecordReaderImpl.evaluatePredicateProto(createStringStats("f", "g", true), pred, null)); assertEquals(TruthValue.YES_NO_NULL, // before & middle - RecordReaderImpl.evaluatePredicateProto(createStringStats("e", "g", true), pred, null)); + RecordReaderImpl.evaluatePredicateProto(createStringStats("e", "g", true), pred, null)); assertEquals(TruthValue.YES_NULL, // min & after - RecordReaderImpl.evaluatePredicateProto(createStringStats("c", "e", true), pred, null)); + RecordReaderImpl.evaluatePredicateProto(createStringStats("c", "e", true), pred, null)); assertEquals(TruthValue.YES_NULL, // min & max RecordReaderImpl.evaluatePredicateProto(createStringStats("c", "f", true), pred, null)); assertEquals(TruthValue.YES_NO_NULL, // min & middle @@ -1623,4 +1638,56 @@ public void testNullsInBloomFilter() throws Exception { bf.addString(HiveDecimal.create(15).toString()); assertEquals(TruthValue.YES_NO_NULL, RecordReaderImpl.evaluatePredicate(cs, pred, bf)); } + + @Test + public void testClose() throws Exception { + DataReader mockedDataReader = mock(DataReader.class); + MetadataReader mockedMetadataReader = mock(MetadataReader.class); + + closeMockedRecordReader(mockedDataReader, mockedMetadataReader); + + verify(mockedDataReader, atLeastOnce()).close(); + verify(mockedMetadataReader, atLeastOnce()).close(); + } + + @Test + public void testCloseWithException() throws Exception { + DataReader mockedDataReader = mock(DataReader.class); + MetadataReader mockedMetadataReader = mock(MetadataReader.class); + doThrow(IOException.class).when(mockedDataReader).close(); + + try { + closeMockedRecordReader(mockedDataReader, mockedMetadataReader); + fail("Exception should have been thrown when Record Reader was closed"); + } catch (IOException expected) { + + } + + verify(mockedMetadataReader, atLeastOnce()).close(); + verify(mockedDataReader, atLeastOnce()).close(); + } + + private void closeMockedRecordReader(DataReader mockedDataReader, + MetadataReader mockedMetadataReader) throws IOException { + DataReaderFactory mockedDataReaderFactory = mock(DataReaderFactory.class); + MetadataReaderFactory mockedMetadataReaderFactory = mock(MetadataReaderFactory.class); + when(mockedDataReaderFactory.create(any(DataReaderProperties.class))).thenReturn(mockedDataReader); + when(mockedMetadataReaderFactory.create(any(MetadataReaderProperties.class))).thenReturn(mockedMetadataReader); + + RecordReader recordReader = RecordReaderImpl.builder() + .withBufferSize(0) + .withCodec(mock(CompressionCodec.class)) + .withConf(mock(Configuration.class)) + .withFileSystem(mock(FileSystem.class)) + .withOptions(mock(Reader.Options.class)) + .withPath(mock(Path.class)) + .withStrideRate(0) + .withStripes(Collections.singletonList(mock(StripeInformation.class))) + .withTypes(Collections.singletonList(OrcProto.Type.getDefaultInstance())) + .withDataReaderFactory(mockedDataReaderFactory) + .withMetadataReaderFactory(mockedMetadataReaderFactory) + .build(); + + recordReader.close(); + } }