diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/AbstractTestParquetDirect.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/AbstractTestParquetDirect.java new file mode 100644 index 0000000..3a47673 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/AbstractTestParquetDirect.java @@ -0,0 +1,154 @@ +package org.apache.hadoop.hive.ql.io.parquet; + +import com.google.common.base.Joiner; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; +import parquet.hadoop.ParquetWriter; +import parquet.hadoop.api.WriteSupport; +import parquet.io.api.RecordConsumer; +import parquet.schema.MessageType; + +public abstract class AbstractTestParquetDirect { + + public static FileSystem localFS = null; + + @BeforeClass + public static void initializeFS() throws IOException { + localFS = FileSystem.getLocal(new Configuration()); + } + + @Rule + public final TemporaryFolder tempDir = new TemporaryFolder(); + + + public interface DirectWriter { + public void write(RecordConsumer consumer); + } + + public static class DirectWriteSupport extends WriteSupport { + private RecordConsumer recordConsumer; + private final MessageType type; + private final DirectWriter writer; + private final Map metadata; + + private DirectWriteSupport(MessageType type, DirectWriter writer, + Map metadata) { + this.type = type; + this.writer = writer; + this.metadata = metadata; + } + + @Override + public WriteContext init(Configuration configuration) { + return new WriteContext(type, metadata); + } + + @Override + public void prepareForWrite(RecordConsumer recordConsumer) { + this.recordConsumer = recordConsumer; + } + + @Override + public void write(Void record) { + writer.write(recordConsumer); + } + } + + public Path writeDirect(String name, MessageType type, DirectWriter writer) + throws IOException { + File temp = tempDir.newFile(name + ".parquet"); + temp.deleteOnExit(); + temp.delete(); + + Path path = new Path(temp.getPath()); + + ParquetWriter parquetWriter = new ParquetWriter(path, + new DirectWriteSupport(type, writer, new HashMap())); + parquetWriter.write(null); + parquetWriter.close(); + + return path; + } + + public static ArrayWritable record(Writable... fields) { + return new ArrayWritable(Writable.class, fields); + } + + public static ArrayWritable list(Writable... elements) { + // the ObjectInspector for array and map expects an extra layer + return new ArrayWritable(ArrayWritable.class, new ArrayWritable[] { + new ArrayWritable(Writable.class, elements) + }); + } + + public static String toString(ArrayWritable arrayWritable) { + Writable[] writables = arrayWritable.get(); + String[] strings = new String[writables.length]; + for (int i = 0; i < writables.length; i += 1) { + if (writables[i] instanceof ArrayWritable) { + strings[i] = toString((ArrayWritable) writables[i]); + } else { + strings[i] = String.valueOf(writables[i]); + } + } + return Arrays.toString(strings); + } + + public static void assertEquals(String message, ArrayWritable expected, + ArrayWritable actual) { + Assert.assertEquals(message, toString(expected), toString(actual)); + } + + public static List read(Path parquetFile) throws IOException { + List records = new ArrayList(); + + RecordReader reader = new MapredParquetInputFormat(). + getRecordReader(new FileSplit( + parquetFile, 0, fileLength(parquetFile), (String[]) null), + new JobConf(), null); + + Void alwaysNull = reader.createKey(); + ArrayWritable record = reader.createValue(); + while (reader.next(alwaysNull, record)) { + records.add(record); + record = reader.createValue(); // a new value so the last isn't clobbered + } + + return records; + } + + public static long fileLength(Path localFile) throws IOException { + return localFS.getFileStatus(localFile).getLen(); + } + + private static final Joiner COMMA = Joiner.on(","); + public void deserialize(Writable record, List columnNames, + List columnTypes) throws Exception { + ParquetHiveSerDe serde = new ParquetHiveSerDe(); + Properties props = new Properties(); + props.setProperty(serdeConstants.LIST_COLUMNS, COMMA.join(columnNames)); + props.setProperty(serdeConstants.LIST_COLUMN_TYPES, COMMA.join(columnTypes)); + serde.initialize(null, props); + serde.deserialize(record); + } +} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestArrayCompatibility.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestArrayCompatibility.java index 6e551de..f7f3e57 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestArrayCompatibility.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestArrayCompatibility.java @@ -19,7 +19,7 @@ import static parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; import static parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; -public class TestArrayCompatibility extends TestParquetDirect { +public class TestArrayCompatibility extends AbstractTestParquetDirect { @Test public void testUnannotatedListOfPrimitives() throws Exception { diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestMapStructures.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestMapStructures.java index ff94484..ca48050 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestMapStructures.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestMapStructures.java @@ -16,7 +16,7 @@ import static parquet.schema.OriginalType.*; import static parquet.schema.PrimitiveType.PrimitiveTypeName.*; -public class TestMapStructures extends TestParquetDirect { +public class TestMapStructures extends AbstractTestParquetDirect { @Test public void testStringMapRequiredPrimitive() throws Exception { diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetDirect.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetDirect.java deleted file mode 100644 index c5525df..0000000 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetDirect.java +++ /dev/null @@ -1,154 +0,0 @@ -package org.apache.hadoop.hive.ql.io.parquet; - -import com.google.common.base.Joiner; -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe; -import org.apache.hadoop.hive.serde.serdeConstants; -import org.apache.hadoop.io.ArrayWritable; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapred.FileSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordReader; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.rules.TemporaryFolder; -import parquet.hadoop.ParquetWriter; -import parquet.hadoop.api.WriteSupport; -import parquet.io.api.RecordConsumer; -import parquet.schema.MessageType; - -public class TestParquetDirect { - - public static FileSystem localFS = null; - - @BeforeClass - public static void initializeFS() throws IOException { - localFS = FileSystem.getLocal(new Configuration()); - } - - @Rule - public final TemporaryFolder tempDir = new TemporaryFolder(); - - - public interface DirectWriter { - public void write(RecordConsumer consumer); - } - - public static class DirectWriteSupport extends WriteSupport { - private RecordConsumer recordConsumer; - private final MessageType type; - private final DirectWriter writer; - private final Map metadata; - - private DirectWriteSupport(MessageType type, DirectWriter writer, - Map metadata) { - this.type = type; - this.writer = writer; - this.metadata = metadata; - } - - @Override - public WriteContext init(Configuration configuration) { - return new WriteContext(type, metadata); - } - - @Override - public void prepareForWrite(RecordConsumer recordConsumer) { - this.recordConsumer = recordConsumer; - } - - @Override - public void write(Void record) { - writer.write(recordConsumer); - } - } - - public Path writeDirect(String name, MessageType type, DirectWriter writer) - throws IOException { - File temp = tempDir.newFile(name + ".parquet"); - temp.deleteOnExit(); - temp.delete(); - - Path path = new Path(temp.getPath()); - - ParquetWriter parquetWriter = new ParquetWriter(path, - new DirectWriteSupport(type, writer, new HashMap())); - parquetWriter.write(null); - parquetWriter.close(); - - return path; - } - - public static ArrayWritable record(Writable... fields) { - return new ArrayWritable(Writable.class, fields); - } - - public static ArrayWritable list(Writable... elements) { - // the ObjectInspector for array and map expects an extra layer - return new ArrayWritable(ArrayWritable.class, new ArrayWritable[] { - new ArrayWritable(Writable.class, elements) - }); - } - - public static String toString(ArrayWritable arrayWritable) { - Writable[] writables = arrayWritable.get(); - String[] strings = new String[writables.length]; - for (int i = 0; i < writables.length; i += 1) { - if (writables[i] instanceof ArrayWritable) { - strings[i] = toString((ArrayWritable) writables[i]); - } else { - strings[i] = String.valueOf(writables[i]); - } - } - return Arrays.toString(strings); - } - - public static void assertEquals(String message, ArrayWritable expected, - ArrayWritable actual) { - Assert.assertEquals(message, toString(expected), toString(actual)); - } - - public static List read(Path parquetFile) throws IOException { - List records = new ArrayList(); - - RecordReader reader = new MapredParquetInputFormat(). - getRecordReader(new FileSplit( - parquetFile, 0, fileLength(parquetFile), (String[]) null), - new JobConf(), null); - - Void alwaysNull = reader.createKey(); - ArrayWritable record = reader.createValue(); - while (reader.next(alwaysNull, record)) { - records.add(record); - record = reader.createValue(); // a new value so the last isn't clobbered - } - - return records; - } - - public static long fileLength(Path localFile) throws IOException { - return localFS.getFileStatus(localFile).getLen(); - } - - private static final Joiner COMMA = Joiner.on(","); - public void deserialize(Writable record, List columnNames, - List columnTypes) throws Exception { - ParquetHiveSerDe serde = new ParquetHiveSerDe(); - Properties props = new Properties(); - props.setProperty(serdeConstants.LIST_COLUMNS, COMMA.join(columnNames)); - props.setProperty(serdeConstants.LIST_COLUMN_TYPES, COMMA.join(columnTypes)); - serde.initialize(null, props); - serde.deserialize(record); - } -}