From 22affd25064eb60298635cae80387560bbc8af59 Mon Sep 17 00:00:00 2001 From: "David Z. Chen" Date: Tue, 17 Jun 2014 13:01:43 -0700 Subject: [PATCH] HIVE-7286: Parameterize HCatMapReduceTest for testing against all Hive storage formats. --- .gitignore | 1 + .../fileformats/TestOrcDynamicPartitioned.java | 52 ------ .../hive/hcatalog/mapreduce/HCatMapReduceTest.java | 167 +++++++++++--------- .../hive/hcatalog/mapreduce/StorageFormats.java | 174 +++++++++++++++++++++ .../mapreduce/TestHCatDynamicPartitioned.java | 7 +- .../TestHCatExternalDynamicPartitioned.java | 16 +- .../mapreduce/TestHCatExternalNonPartitioned.java | 5 + .../mapreduce/TestHCatExternalPartitioned.java | 5 + .../TestHCatMutableDynamicPartitioned.java | 5 + .../mapreduce/TestHCatMutableNonPartitioned.java | 6 +- .../mapreduce/TestHCatMutablePartitioned.java | 5 + .../hcatalog/mapreduce/TestHCatNonPartitioned.java | 10 +- .../hcatalog/mapreduce/TestHCatPartitioned.java | 9 +- .../storage/AvroStorageCustomHandler.java | 68 ++++++++ .../mapreduce/storage/StorageCustomHandler.java | 36 +++++ .../org/apache/hadoop/hive/ql/io/IOConstants.java | 1 + .../hadoop/hive/serde2/avro/TypeInfoToSchema.java | 133 ++++++++++++++++ .../hive/serde2/avro/TestTypeInfoToSchema.java | 74 +++++++++ serde/src/test/resources/alltypes.avsc | 48 ++++++ 19 files changed, 672 insertions(+), 150 deletions(-) delete mode 100644 hcatalog/core/src/test/java/org/apache/hive/hcatalog/fileformats/TestOrcDynamicPartitioned.java create mode 100644 hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/StorageFormats.java create mode 100644 hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/storage/AvroStorageCustomHandler.java create mode 100644 hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/storage/StorageCustomHandler.java create mode 100644 serde/src/java/org/apache/hadoop/hive/serde2/avro/TypeInfoToSchema.java create mode 100644 serde/src/test/org/apache/hadoop/hive/serde2/avro/TestTypeInfoToSchema.java create mode 100644 serde/src/test/resources/alltypes.avsc diff --git a/.gitignore b/.gitignore index d0c97d1..fa9773a 100644 --- a/.gitignore +++ b/.gitignore @@ -13,6 +13,7 @@ common/src/gen *.iml *.ipr *.iws +*.swp derby.log datanucleus.log .arc diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/fileformats/TestOrcDynamicPartitioned.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/fileformats/TestOrcDynamicPartitioned.java deleted file mode 100644 index f68dbb8..0000000 --- a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/fileformats/TestOrcDynamicPartitioned.java +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.hive.hcatalog.fileformats; - -import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; -import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat; -import org.apache.hadoop.hive.ql.io.orc.OrcSerde; -import org.apache.hive.hcatalog.mapreduce.TestHCatDynamicPartitioned; -import org.junit.BeforeClass; - -public class TestOrcDynamicPartitioned extends TestHCatDynamicPartitioned { - - @BeforeClass - public static void generateInputData() throws Exception { - tableName = "testOrcDynamicPartitionedTable"; - generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0); - generateDataColumns(); - } - - @Override - protected String inputFormat() { - return OrcInputFormat.class.getName(); - } - - @Override - protected String outputFormat() { - return OrcOutputFormat.class.getName(); - } - - @Override - protected String serdeClass() { - return OrcSerde.class.getName(); - } - -} diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java index 9ddc3a6..a5c0c81 100644 --- a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java +++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java @@ -21,12 +21,11 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; -import junit.framework.Assert; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -40,9 +39,6 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; -import org.apache.hadoop.hive.ql.io.RCFileInputFormat; -import org.apache.hadoop.hive.ql.io.RCFileOutputFormat; -import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; @@ -53,15 +49,24 @@ import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; + import org.apache.hive.hcatalog.common.HCatConstants; import org.apache.hive.hcatalog.common.HCatUtil; import org.apache.hive.hcatalog.data.DefaultHCatRecord; import org.apache.hive.hcatalog.data.HCatRecord; import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; import org.apache.hive.hcatalog.data.schema.HCatSchema; +import org.apache.hive.hcatalog.mapreduce.storage.StorageCustomHandler; + +import junit.framework.Assert; + import org.junit.After; +import org.junit.Assume; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,24 +74,54 @@ /** * Test for HCatOutputFormat. Writes a partition using HCatOutputFormat and reads - * it back using HCatInputFormat, checks the column values and counts. + * it back using HCatInputFormat, checks the column values and counts. This class + * can be tested to test different partitioning schemes. + * + * This is a parameterized test that tests HCatOutputFormat and HCatInputFormat against Hive's + * native storage formats enumerated using {@link org.apache.hive.hcatalog.mapreduce.StorageFormats}. */ +@RunWith(Parameterized.class) public abstract class HCatMapReduceTest extends HCatBaseTest { - private static final Logger LOG = LoggerFactory.getLogger(HCatMapReduceTest.class); + protected static String dbName = MetaStoreUtils.DEFAULT_DATABASE_NAME; - protected static String tableName = "testHCatMapReduceTable"; + protected static final String TABLE_NAME = "testHCatMapReduceTable"; private static List writeRecords = new ArrayList(); private static List readRecords = new ArrayList(); + private static FileSystem fs; + private String externalTableLocation = null; + protected String tableName; + protected String serdeClass; + protected String inputFormatClass; + protected String outputFormatClass; + protected StorageCustomHandler storageCustomHandler; + + @Parameterized.Parameters + public static Collection generateParameters() { + return StorageFormats.asParameters(); + } + + /** + * Test constructor that sets the storage format class names provided by the test parameter. + */ + public HCatMapReduceTest(String name, String serdeClass, String inputFormatClass, + String outputFormatClass, String storageCustomHandlerClass) throws Exception { + this.serdeClass = serdeClass; + this.inputFormatClass = inputFormatClass; + this.outputFormatClass = outputFormatClass; + if (storageCustomHandlerClass != null) { + this.storageCustomHandler = (StorageCustomHandler) Class.forName(storageCustomHandlerClass) + .newInstance(); + } + this.tableName = TABLE_NAME + "_" + name; + } + protected abstract List getPartitionKeys(); protected abstract List getTableColumns(); - private static FileSystem fs; - private String externalTableLocation = null; - protected Boolean isTableExternal() { return false; } @@ -95,18 +130,6 @@ protected boolean isTableImmutable() { return true; } - protected String inputFormat() { - return RCFileInputFormat.class.getName(); - } - - protected String outputFormat() { - return RCFileOutputFormat.class.getName(); - } - - protected String serdeClass() { - return ColumnarSerDe.class.getName(); - } - @BeforeClass public static void setUpOneTime() throws Exception { fs = new LocalFileSystem(); @@ -143,13 +166,16 @@ public void deleteTable() throws Exception { @Before public void createTable() throws Exception { - String databaseName = (dbName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : dbName; + // Use Junit's Assume to skip running this fixture against any storage formats whose + // SerDe is in the disabled serdes list. + Assume.assumeTrue(!StorageFormats.DISABLED_SERDES.contains(serdeClass)); + String databaseName = (dbName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : dbName; try { client.dropTable(databaseName, tableName); } catch (Exception e) { - } //can fail with NoSuchObjectException - + // Can fail with NoSuchObjectException. + } Table tbl = new Table(); tbl.setDbName(databaseName); @@ -160,10 +186,9 @@ public void createTable() throws Exception { tbl.setTableType(TableType.MANAGED_TABLE.toString()); } StorageDescriptor sd = new StorageDescriptor(); - sd.setCols(getTableColumns()); - tbl.setPartitionKeys(getPartitionKeys()); + tbl.setPartitionKeys(getPartitionKeys()); tbl.setSd(sd); sd.setBucketCols(new ArrayList(2)); @@ -171,12 +196,12 @@ public void createTable() throws Exception { sd.getSerdeInfo().setName(tbl.getTableName()); sd.getSerdeInfo().setParameters(new HashMap()); sd.getSerdeInfo().getParameters().put(serdeConstants.SERIALIZATION_FORMAT, "1"); - if (isTableExternal()){ + if (isTableExternal()) { sd.getSerdeInfo().getParameters().put("EXTERNAL", "TRUE"); } - sd.getSerdeInfo().setSerializationLib(serdeClass()); - sd.setInputFormat(inputFormat()); - sd.setOutputFormat(outputFormat()); + sd.getSerdeInfo().setSerializationLib(serdeClass); + sd.setInputFormat(inputFormatClass); + sd.setOutputFormat(outputFormatClass); Map tableParams = new HashMap(); if (isTableExternal()) { @@ -186,72 +211,66 @@ public void createTable() throws Exception { tableParams.put(hive_metastoreConstants.IS_IMMUTABLE,"true"); } tbl.setParameters(tableParams); + if (storageCustomHandler != null) { + storageCustomHandler.setCustomTableProperties(tbl); + } client.createTable(tbl); } - //Create test input file with specified number of rows + /* + * Create test input file with specified number of rows + */ private void createInputFile(Path path, int rowCount) throws IOException { - if (fs.exists(path)) { fs.delete(path, true); } FSDataOutputStream os = fs.create(path); - for (int i = 0; i < rowCount; i++) { os.writeChars(i + "\n"); } - os.close(); } - public static class MapCreate extends - Mapper { - - static int writeCount = 0; //test will be in local mode + public static class MapCreate extends Mapper { + // Test will be in local mode. + static int writeCount = 0; @Override - public void map(LongWritable key, Text value, Context context - ) throws IOException, InterruptedException { - { - try { - HCatRecord rec = writeRecords.get(writeCount); - context.write(null, rec); - writeCount++; - - } catch (Exception e) { - - e.printStackTrace(System.err); //print since otherwise exception is lost - throw new IOException(e); - } + public void map(LongWritable key, Text value, Context context) + throws IOException, InterruptedException { + try { + HCatRecord rec = writeRecords.get(writeCount); + context.write(null, rec); + writeCount++; + } catch (Exception e) { + // Print since otherwise exception is lost. + e.printStackTrace(System.err); + throw new IOException(e); } } } - public static class MapRead extends - Mapper { - + public static class MapRead extends Mapper { static int readCount = 0; //test will be in local mode @Override - public void map(WritableComparable key, HCatRecord value, Context context - ) throws IOException, InterruptedException { - { - try { - readRecords.add(value); - readCount++; - } catch (Exception e) { - e.printStackTrace(); //print since otherwise exception is lost - throw new IOException(e); - } + public void map(WritableComparable key, HCatRecord value, Context context) + throws IOException, InterruptedException { + try { + readRecords.add(value); + readCount++; + } catch (Exception e) { + // Print since otherwise exception is lost. + e.printStackTrace(); + throw new IOException(e); } } } - Job runMRCreate(Map partitionValues, - List partitionColumns, List records, - int writeCount, boolean assertWrite) throws Exception { + Job runMRCreate(Map partitionValues, List partitionColumns, + List records, int writeCount, boolean assertWrite) throws Exception { return runMRCreate(partitionValues, partitionColumns, records, writeCount, assertWrite, true, null); } @@ -267,10 +286,9 @@ Job runMRCreate(Map partitionValues, * @return * @throws Exception */ - Job runMRCreate(Map partitionValues, - List partitionColumns, List records, - int writeCount, boolean assertWrite, boolean asSingleMapTask, - String customDynamicPathPattern) throws Exception { + Job runMRCreate(Map partitionValues, List partitionColumns, + List records, int writeCount, boolean assertWrite, boolean asSingleMapTask, + String customDynamicPathPattern) throws Exception { writeRecords = records; MapCreate.writeCount = 0; @@ -355,7 +373,6 @@ Job runMRCreate(Map partitionValues, * @throws Exception */ List runMRRead(int readCount, String filter) throws Exception { - MapRead.readCount = 0; readRecords.clear(); @@ -388,9 +405,7 @@ Job runMRCreate(Map partitionValues, return readRecords; } - protected HCatSchema getTableSchema() throws Exception { - Configuration conf = new Configuration(); Job job = new Job(conf, "hcat mapreduce read schema test"); job.setJarByClass(this.getClass()); diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/StorageFormats.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/StorageFormats.java new file mode 100644 index 0000000..4ba674a --- /dev/null +++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/StorageFormats.java @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.hcatalog.mapreduce; + +import com.google.common.collect.ImmutableSet; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.ServiceLoader; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.io.RCFileInputFormat; +import org.apache.hadoop.hive.ql.io.RCFileOutputFormat; +import org.apache.hadoop.hive.ql.io.RCFileStorageFormatDescriptor; +import org.apache.hadoop.hive.ql.io.StorageFormatDescriptor; +import org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat; +import org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat; +import org.apache.hadoop.hive.ql.io.orc.OrcSerde; +import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe; +import org.apache.hadoop.hive.serde2.avro.AvroSerDe; +import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; + +import org.apache.hive.hcatalog.mapreduce.storage.AvroStorageCustomHandler; +import org.apache.hive.hcatalog.mapreduce.storage.StorageCustomHandler; + +import static org.junit.Assert.assertTrue; + +/** + * Utility class for enumerating Hive native storage formats for testing. Native Storage formats + * are registered via {@link org.apache.hadoop.hive.ql.io.StorageFormatDescriptor}. + */ +public class StorageFormats { + /** + * Table of additional storage formats. These are SerDes or combinations of SerDe with + * InputFormat and OutputFormat that are not registered as a native Hive storage format. + * + * Each row in this table has the following fields: + * - formatName - A string name for the storage format. This is used to give the table created + * for the test a unique name. + * - serdeClass - The name of the SerDe class used by the storage format. + * - inputFormatClass - The name of the InputFormat class. + * - outputFormatClass - The name of the OutputFormat class. + * - storageCustomHandlerClass - The name of the StorageCustomHandler class. See + * {@link org.apache.hadoop.hive.hcatalog.mapreduce.storage.StorageCustomHandler}. + */ + public static final Object[][] ADDITIONAL_STORAGE_FORMATS = new Object[][] { + { + "rcfile_columnar", + ColumnarSerDe.class.getName(), + RCFileInputFormat.class.getName(), + RCFileOutputFormat.class.getName(), + null, + }, { + "avro", + AvroSerDe.class.getName(), + AvroContainerInputFormat.class.getName(), + AvroContainerOutputFormat.class.getName(), + AvroStorageCustomHandler.class.getName(), + } + }; + + /** + * List of SerDe classes that the HCatalog core tests will not be run against. + */ + public static final Set DISABLED_SERDES = ImmutableSet.of( + AvroSerDe.class.getName(), + ParquetHiveSerDe.class.getName()); + + /** + * Create an array of Objects used to populate the test paramters. + * + * @param name Name of the storage format. + * @param serdeClass Name of the SerDe class. + * @param inputFormatClass Name of the InputFormat class. + * @param outputFormatClass Name of the OutputFormat class. + * @return Object array containing the arguments. + */ + protected static Object[] createTestArguments(String name, String serdeClass, + String inputFormatClass, String outputFormatClass) { + return createTestArguments(name, serdeClass, inputFormatClass, outputFormatClass, null); + } + + /** + * Create an array of Objects used to populate the test paramters. + * + * @param name Name of the storage format. + * @param serdeClass Name of the SerDe class. + * @param inputFormatClass Name of the InputFormat class. + * @param outputFormatClass Name of the OutputFormat class. + * @param storageCustomHandlerClass Name of the StorageCustomHandler class. + * @return Object array containing the arguments. + */ + protected static Object[] createTestArguments(String name, String serdeClass, + String inputFormatClass, String outputFormatClass, String storageCustomHandlerClass) { + Object[] args = { + name, + serdeClass, + inputFormatClass, + outputFormatClass, + storageCustomHandlerClass, + }; + return args; + } + + /** + * Generates a collection of parameters that can be used as paramters for a JUnit test fixture. + * Each parameter represents one storage format that the fixture will run against. The list + * includes both native Hive storage formats as well as those enumerated in the + * ADDITIONAL_STORAGE_FORMATS table. + * + * @return List of storage format as paramters. + */ + public static Collection asParameters() { + List parameters = new ArrayList(); + + // Add test parameters from official storage formats registered with Hive via + // StorageFormatDescriptor. + final Configuration conf = new Configuration(); + for (StorageFormatDescriptor descriptor : ServiceLoader.load(StorageFormatDescriptor.class)) { + String serdeClass = descriptor.getSerde(); + if (serdeClass == null) { + if (descriptor instanceof RCFileStorageFormatDescriptor) { + serdeClass = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEDEFAULTRCFILESERDE); + } else { + serdeClass = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEDEFAULTSERDE); + } + } + + String[] names = new String[descriptor.getNames().size()]; + names = descriptor.getNames().toArray(names); + Object[] arguments = createTestArguments(names[0], serdeClass, descriptor.getInputFormat(), + descriptor.getOutputFormat()); + parameters.add(arguments); + } + + // Add test parameters from storage formats specified in ADDITIONAL_STORAGE_FORMATS table. + for (int i = 0; i < ADDITIONAL_STORAGE_FORMATS.length; i++) { + String serdeClass = (String) ADDITIONAL_STORAGE_FORMATS[i][1]; + String name = (String) ADDITIONAL_STORAGE_FORMATS[i][0]; + String inputFormatClass = (String) ADDITIONAL_STORAGE_FORMATS[i][2]; + String outputFormatClass = (String) ADDITIONAL_STORAGE_FORMATS[i][3]; + String storageCustomHandlerClass = (String) ADDITIONAL_STORAGE_FORMATS[i][4]; + assertTrue("InputFormat for storage format not set", inputFormatClass != null); + assertTrue("OutputFormat for storage format not set", outputFormatClass != null); + Object[] arguments = createTestArguments(name, serdeClass, inputFormatClass, + outputFormatClass, storageCustomHandlerClass); + parameters.add(arguments); + } + + return parameters; + } +} + diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java index 39e9208..2733fbd 100644 --- a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java +++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java @@ -53,9 +53,10 @@ protected static final int NUM_RECORDS = 20; protected static final int NUM_PARTITIONS = 5; - @BeforeClass - public static void generateInputData() throws Exception { - tableName = "testHCatDynamicPartitionedTable"; + public TestHCatDynamicPartitioned(String formatName, String serdeClass, String inputFormatClass, + String outputFormatClass, String storageCustomHandlerClass) throws Exception { + super(formatName, serdeClass, inputFormatClass, outputFormatClass, storageCustomHandlerClass); + tableName = "testHCatDynamicPartitionedTable_" + formatName; generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0); generateDataColumns(); } diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalDynamicPartitioned.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalDynamicPartitioned.java index 0838765..18b7fcf 100644 --- a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalDynamicPartitioned.java +++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalDynamicPartitioned.java @@ -24,18 +24,20 @@ public class TestHCatExternalDynamicPartitioned extends TestHCatDynamicPartitioned { + public TestHCatExternalDynamicPartitioned(String formatName, String serdeClass, + String inputFormatClass, String outputFormatClass, String storageCustomHandlerClass) + throws Exception { + super(formatName, serdeClass, inputFormatClass, outputFormatClass, storageCustomHandlerClass); + tableName = "testHCatExternalDynamicPartitionedTable_" + formatName; + generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0); + generateDataColumns(); + } + @Override protected Boolean isTableExternal() { return true; } - @BeforeClass - public static void generateInputData() throws Exception { - tableName = "testHCatExternalDynamicPartitionedTable"; - generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0); - generateDataColumns(); - } - /** * Run the external dynamic partitioning test but with single map task * @throws Exception diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalNonPartitioned.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalNonPartitioned.java index 01b2ad6..556d9b7 100644 --- a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalNonPartitioned.java +++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalNonPartitioned.java @@ -20,6 +20,11 @@ package org.apache.hive.hcatalog.mapreduce; public class TestHCatExternalNonPartitioned extends TestHCatNonPartitioned { + public TestHCatExternalNonPartitioned(String formatName, String serdeName, + String inputFormatClass, String outputFormatClass, String storageCustomHandlerClass) + throws Exception { + super(formatName, serdeName, inputFormatClass, outputFormatClass, storageCustomHandlerClass); + } @Override protected Boolean isTableExternal() { diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalPartitioned.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalPartitioned.java index e5f8d1e..eabb94a 100644 --- a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalPartitioned.java +++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalPartitioned.java @@ -20,6 +20,11 @@ package org.apache.hive.hcatalog.mapreduce; public class TestHCatExternalPartitioned extends TestHCatPartitioned { + public TestHCatExternalPartitioned(String formatName, String serdeClass, + String inputFormatClass, String outputFormatClass, String storageCustomHandlerClass) + throws Exception { + super(formatName, serdeClass, inputFormatClass, outputFormatClass, storageCustomHandlerClass); + } @Override protected Boolean isTableExternal() { diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutableDynamicPartitioned.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutableDynamicPartitioned.java index bfc6a4f..84dc005 100644 --- a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutableDynamicPartitioned.java +++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutableDynamicPartitioned.java @@ -20,6 +20,11 @@ package org.apache.hive.hcatalog.mapreduce; public class TestHCatMutableDynamicPartitioned extends TestHCatDynamicPartitioned { + public TestHCatMutableDynamicPartitioned(String formatName, String serdeClass, + String inputFormatClass, String outputFormatClass, String storageCustomHandlerClass) + throws Exception { + super(formatName, serdeClass, inputFormatClass, outputFormatClass, storageCustomHandlerClass); + } @Override protected boolean isTableImmutable() { diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutableNonPartitioned.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutableNonPartitioned.java index a944023..3590fc6 100644 --- a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutableNonPartitioned.java +++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutableNonPartitioned.java @@ -20,7 +20,11 @@ package org.apache.hive.hcatalog.mapreduce; public class TestHCatMutableNonPartitioned extends TestHCatNonPartitioned { - + public TestHCatMutableNonPartitioned(String formatName, String serdeClass, + String inputFormatClass, String outputFormatClass, String storageCustomHandlerClass) + throws Exception { + super(formatName, serdeClass, inputFormatClass, outputFormatClass, storageCustomHandlerClass); + } @Override protected boolean isTableImmutable() { diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutablePartitioned.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutablePartitioned.java index 73d6e80..e9a45e0 100644 --- a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutablePartitioned.java +++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutablePartitioned.java @@ -20,6 +20,11 @@ package org.apache.hive.hcatalog.mapreduce; public class TestHCatMutablePartitioned extends TestHCatPartitioned { + public TestHCatMutablePartitioned(String formatName, String serdeClass, + String inputFormatClass, String outputFormatClass, String storageCustomHandlerClass) + throws Exception { + super(formatName, serdeClass, inputFormatClass, outputFormatClass, storageCustomHandlerClass); + } @Override protected boolean isTableImmutable() { diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatNonPartitioned.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatNonPartitioned.java index d639e99..6c04710 100644 --- a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatNonPartitioned.java +++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatNonPartitioned.java @@ -43,16 +43,14 @@ import static org.junit.Assert.assertNull; public class TestHCatNonPartitioned extends HCatMapReduceTest { - private static List writeRecords; static List partitionColumns; - @BeforeClass - public static void oneTimeSetUp() throws Exception { - + public TestHCatNonPartitioned(String formatName, String serdeClass, String inputFormatClass, + String outputFormatClass, String storageCustomHandlerClass) throws Exception { + super(formatName, serdeClass, inputFormatClass, outputFormatClass, storageCustomHandlerClass); dbName = null; //test if null dbName works ("default" is used) - tableName = "testHCatNonPartitionedTable"; - + tableName = "testHCatNonPartitionedTable_" + formatName; writeRecords = new ArrayList(); for (int i = 0; i < 20; i++) { diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatPartitioned.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatPartitioned.java index dc05f43..033b613 100644 --- a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatPartitioned.java +++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatPartitioned.java @@ -49,10 +49,10 @@ private static List writeRecords; private static List partitionColumns; - @BeforeClass - public static void oneTimeSetUp() throws Exception { - - tableName = "testHCatPartitionedTable"; + public TestHCatPartitioned(String formatName, String serdeClass, String inputFormatClass, + String outputFormatClass, String storageCustomHandlerClass) throws Exception { + super(formatName, serdeClass, inputFormatClass, outputFormatClass, storageCustomHandlerClass); + tableName = "testHCatPartitionedTable_" + formatName; writeRecords = new ArrayList(); for (int i = 0; i < 20; i++) { @@ -68,7 +68,6 @@ public static void oneTimeSetUp() throws Exception { partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", serdeConstants.STRING_TYPE_NAME, ""))); } - @Override protected List getPartitionKeys() { List fields = new ArrayList(); diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/storage/AvroStorageCustomHandler.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/storage/AvroStorageCustomHandler.java new file mode 100644 index 0000000..e2af6a5 --- /dev/null +++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/storage/AvroStorageCustomHandler.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.hcatalog.mapreduce.storage; + +import com.google.common.io.Resources; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.avro.Schema; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.serde2.avro.TypeInfoToSchema; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** + * Custom logic for running tests against the Avro storage format. + */ +public class AvroStorageCustomHandler extends StorageCustomHandler { + /** + * Takes the schema of the table and converts it to an Avro schema in order to set the required + * `avro.schema.literal` property of the table. + * + * @param table The table object. + */ + @Override + public void setCustomTableProperties(Table table) { + StorageDescriptor sd = table.getSd(); + List cols = sd.getCols(); + List columnNames = new ArrayList(); + List columnTypes = new ArrayList(); + for (FieldSchema col : cols) { + columnNames.add(col.getName()); + columnTypes.add(TypeInfoUtils.getTypeInfoFromTypeString(col.getType())); + } + Schema avroSchema = TypeInfoToSchema.convert(columnTypes, columnNames); + + Map tableParams = new HashMap(); + tableParams.put("avro.schema.literal", avroSchema.toString()); + table.setParameters(tableParams); + } +} + diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/storage/StorageCustomHandler.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/storage/StorageCustomHandler.java new file mode 100644 index 0000000..36f5592 --- /dev/null +++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/storage/StorageCustomHandler.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.hcatalog.mapreduce.storage; + +import org.apache.hadoop.hive.metastore.api.Table; + +/** + * Class used to handle custom logic, such as setting custom table properties, + * when running HCatalog core tests against a storage format. + */ +public class StorageCustomHandler { + /** + * Sets custom table properties. By default, this method is an no-op. + * + * @param table The table object. + */ + public void setCustomTableProperties(Table table) { + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/IOConstants.java b/ql/src/java/org/apache/hadoop/hive/ql/io/IOConstants.java index 1bae0a8..9add316 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/IOConstants.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/IOConstants.java @@ -33,6 +33,7 @@ public static final String ORCFILE = "ORCFILE"; public static final String PARQUET = "PARQUET"; public static final String PARQUETFILE = "PARQUETFILE"; + public static final String AVRO = "AVRO"; @VisibleForTesting public static final String CUSTOM_TEXT_SERDE = "CustomTextSerde"; diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/avro/TypeInfoToSchema.java b/serde/src/java/org/apache/hadoop/hive/serde2/avro/TypeInfoToSchema.java new file mode 100644 index 0000000..68c6176 --- /dev/null +++ b/serde/src/java/org/apache/hadoop/hive/serde2/avro/TypeInfoToSchema.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.serde2.avro; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.avro.Schema; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; + +public class TypeInfoToSchema { + /** + * Converts a list of Hive TypeInfo and column names into an Avro Schema. + * + * @param columnTypes + * @param columnNames + * @return Avro schema + */ + public static Schema convert(List columnTypes, List columnNames) { + List fields = new ArrayList(); + for (int i = 0; i < columnTypes.size(); i++) { + final Schema fieldSchema = convert(columnTypes.get(i), columnNames.get(i)); + fields.add(new Schema.Field(columnNames.get(i), fieldSchema, null, null)); + } + Schema recordSchema = Schema.createRecord("schema", null, null, false); + recordSchema.setFields(fields); + return recordSchema; + } + + private static Schema convert(TypeInfo typeInfo, String fieldName) { + final Category typeCategory = typeInfo.getCategory(); + switch (typeCategory) { + case PRIMITIVE: + return convertPrimitive(fieldName, (PrimitiveTypeInfo) typeInfo); + case LIST: + return convertList(fieldName, (ListTypeInfo) typeInfo); + case STRUCT: + return convertStruct(fieldName, (StructTypeInfo) typeInfo); + case MAP: + return convertMap(fieldName, (MapTypeInfo) typeInfo); + default: + throw new TypeNotPresentException(typeInfo.getTypeName(), null); + } + } + + private static Schema convertPrimitive(String fieldName, PrimitiveTypeInfo typeInfo) { + if (typeInfo.equals(TypeInfoFactory.intTypeInfo) + || typeInfo.equals(TypeInfoFactory.shortTypeInfo) + || typeInfo.equals(TypeInfoFactory.byteTypeInfo)) { + return Schema.create(Schema.Type.INT); + } else if (typeInfo.equals(TypeInfoFactory.longTypeInfo)) { + return Schema.create(Schema.Type.LONG); + } else if (typeInfo.equals(TypeInfoFactory.floatTypeInfo)) { + return Schema.create(Schema.Type.FLOAT); + } else if (typeInfo.equals(TypeInfoFactory.doubleTypeInfo)) { + return Schema.create(Schema.Type.DOUBLE); + } else if (typeInfo.equals(TypeInfoFactory.booleanTypeInfo)) { + return Schema.create(Schema.Type.BOOLEAN); + } else if (typeInfo.equals(TypeInfoFactory.stringTypeInfo)) { + return Schema.create(Schema.Type.STRING); + } else if (typeInfo.equals(TypeInfoFactory.binaryTypeInfo)) { + return Schema.create(Schema.Type.BYTES); + } else if (typeInfo.equals(TypeInfoFactory.decimalTypeInfo)) { + throw new RuntimeException("Decimal type not supported."); + } else if (typeInfo.equals(TypeInfoFactory.timestampTypeInfo)) { + throw new RuntimeException("Timestamp type not supported."); + } else if (typeInfo.equals(TypeInfoFactory.dateTypeInfo)) { + throw new RuntimeException("Date type not supported."); + } else if (typeInfo.equals(TypeInfoFactory.varcharTypeInfo)) { + throw new RuntimeException("Varchar type not supported."); + } else if (typeInfo.equals(TypeInfoFactory.unknownTypeInfo)) { + throw new RuntimeException("Unknown type not supported."); + } else { + throw new RuntimeException("Unknown type: " + typeInfo); + } + } + + private static Schema convertList(String fieldName, ListTypeInfo typeInfo) { + final TypeInfo elementTypeInfo = typeInfo.getListElementTypeInfo(); + final Schema elementType = convert(elementTypeInfo, "array_element"); + return Schema.createArray(elementType); + } + + private static Schema convertStruct(String fieldName, StructTypeInfo typeInfo) { + final List columnNames = typeInfo.getAllStructFieldNames(); + final List columnTypeInfos = typeInfo.getAllStructFieldTypeInfos(); + List fields = new ArrayList(); + for (int i = 0; i < columnNames.size(); i++) { + final String columnFieldName = columnNames.get(i); + final Schema fieldSchema = convert(columnTypeInfos.get(i), columnFieldName); + fields.add(new Schema.Field(columnFieldName, fieldSchema, null, null)); + } + final Schema recordSchema = Schema.createRecord(fieldName, null, null, false); + recordSchema.setFields(fields); + return recordSchema; + } + + private static Schema convertMap(String fieldName, MapTypeInfo typeInfo) { + final TypeInfo keyTypeInfo = typeInfo.getMapKeyTypeInfo(); + if (!keyTypeInfo.equals(TypeInfoFactory.stringTypeInfo)) { + throw new RuntimeException("Avro does not support maps with key type: " + keyTypeInfo); + } + final Schema valueType = convert(typeInfo.getMapValueTypeInfo(), "value"); + return Schema.createMap(valueType); + } +} diff --git a/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestTypeInfoToSchema.java b/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestTypeInfoToSchema.java new file mode 100644 index 0000000..664d0e0 --- /dev/null +++ b/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestTypeInfoToSchema.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.serde2.avro; + +import com.google.common.io.Resources; + +import java.util.Arrays; +import java.util.List; + +import org.apache.avro.Schema; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class TestTypeInfoToSchema { + @Test + public void testAllTypes() throws Exception { + String fieldNames = + "myboolean," + + "myint," + + "mylong," + + "myfloat," + + "mydouble," + + "mybytes," + + "mystring," + + "myrecord," + + "myarray," + + "mymap"; + String fieldTypes = + "boolean," + + "int," + + "bigint," + + "float," + + "double," + + "binary," + + "string," + + "struct," + + "array," + + "map"; + + List columnNames = Arrays.asList(fieldNames.split(",")); + List columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(fieldTypes); + + assertEquals(columnNames.size(), columnTypes.size()); + + Schema avroSchema = TypeInfoToSchema.convert(columnTypes, columnNames); + Schema expectedSchema = new Schema.Parser().parse( + Resources.getResource("alltypes.avsc").openStream()); + + assertEquals("Expected: " + expectedSchema.toString(true) + + "\nGot: " + avroSchema.toString(true), + expectedSchema.toString(), avroSchema.toString()); + } +} diff --git a/serde/src/test/resources/alltypes.avsc b/serde/src/test/resources/alltypes.avsc new file mode 100644 index 0000000..39be023 --- /dev/null +++ b/serde/src/test/resources/alltypes.avsc @@ -0,0 +1,48 @@ +{ + "type" : "record", + "name" : "schema", + "fields" : [ { + "name" : "myboolean", + "type" : "boolean" + }, { + "name" : "myint", + "type" : "int" + }, { + "name" : "mylong", + "type" : "long" + }, { + "name" : "myfloat", + "type" : "float" + }, { + "name" : "mydouble", + "type" : "double" + }, { + "name" : "mybytes", + "type" : "bytes" + }, { + "name" : "mystring", + "type" : "string" + }, { + "name" : "myrecord", + "type" : { + "type" : "record", + "name" : "myrecord", + "fields" : [ { + "name" : "mynestedint", + "type" : "int" + } ] + } + }, { + "name" : "myarray", + "type" : { + "type" : "array", + "items" : "int" + } + }, { + "name" : "mymap", + "type" : { + "type" : "map", + "values" : "int" + } + } ] +} -- 1.8.3.4 (Apple Git-47)