diff --git a/.gitignore b/.gitignore index d0c97d1..fa9773a 100644 --- a/.gitignore +++ b/.gitignore @@ -13,6 +13,7 @@ common/src/gen *.iml *.ipr *.iws +*.swp derby.log datanucleus.log .arc diff --git a/hcatalog/core/pom.xml b/hcatalog/core/pom.xml index b5e85cd..24aecf9 100644 --- a/hcatalog/core/pom.xml +++ b/hcatalog/core/pom.xml @@ -71,6 +71,12 @@ jackson-mapper-asl ${jackson.version} + + org.reflections + reflections + 0.9.9-RC1 + test + diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/fileformats/TestOrcDynamicPartitioned.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/fileformats/TestOrcDynamicPartitioned.java deleted file mode 100644 index f68dbb8..0000000 --- a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/fileformats/TestOrcDynamicPartitioned.java +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.hive.hcatalog.fileformats; - -import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; -import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat; -import org.apache.hadoop.hive.ql.io.orc.OrcSerde; -import org.apache.hive.hcatalog.mapreduce.TestHCatDynamicPartitioned; -import org.junit.BeforeClass; - -public class TestOrcDynamicPartitioned extends TestHCatDynamicPartitioned { - - @BeforeClass - public static void generateInputData() throws Exception { - tableName = "testOrcDynamicPartitionedTable"; - generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0); - generateDataColumns(); - } - - @Override - protected String inputFormat() { - return OrcInputFormat.class.getName(); - } - - @Override - protected String outputFormat() { - return OrcOutputFormat.class.getName(); - } - - @Override - protected String serdeClass() { - return OrcSerde.class.getName(); - } - -} diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java index 9ddc3a6..3efbed2 100644 --- a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java +++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java @@ -20,12 +20,16 @@ package org.apache.hive.hcatalog.mapreduce; import java.io.IOException; +import java.lang.reflect.Modifier; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; - -import junit.framework.Assert; +import java.util.ServiceLoader; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; @@ -40,9 +44,31 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedColumnarSerDe; import org.apache.hadoop.hive.ql.io.RCFileInputFormat; import org.apache.hadoop.hive.ql.io.RCFileOutputFormat; +import org.apache.hadoop.hive.ql.io.RCFileStorageFormatDescriptor; +import org.apache.hadoop.hive.ql.io.StorageFormatDescriptor; +import org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat; +import org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat; +import org.apache.hadoop.hive.ql.io.orc.OrcSerde; +import org.apache.hadoop.hive.ql.io.orc.VectorizedOrcSerde; +import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe; +import org.apache.hadoop.hive.serde2.ByteStreamTypedSerDe; +import org.apache.hadoop.hive.serde2.DelimitedJSONSerDe; +import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe; +import org.apache.hadoop.hive.serde2.NullStructSerDe; +import org.apache.hadoop.hive.serde2.RegexSerDe; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.TypedSerDe; +import org.apache.hadoop.hive.serde2.avro.AvroSerDe; +import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe; +import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe; import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe; +import org.apache.hadoop.hive.serde2.dynamic_type.DynamicSerDe; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; +import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe; +import org.apache.hadoop.hive.serde2.thrift.ThriftByteStreamTypedSerDe; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; @@ -59,52 +85,249 @@ import org.apache.hive.hcatalog.data.HCatRecord; import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; import org.apache.hive.hcatalog.data.schema.HCatSchema; +import org.apache.hive.hcatalog.mapreduce.storage.AvroStorageCustomHandler; +import org.apache.hive.hcatalog.mapreduce.storage.StorageCustomHandler; + +import junit.framework.Assert; + import org.junit.After; +import org.junit.Assume; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.reflections.Reflections; + import static org.junit.Assert.assertTrue; /** * Test for HCatOutputFormat. Writes a partition using HCatOutputFormat and reads - * it back using HCatInputFormat, checks the column values and counts. + * it back using HCatInputFormat, checks the column values and counts. This class + * can be tested to test different partitioning schemes. + * + * This is a parameterized test that tests HCatOutputFormat and HCatInputFormat against all + * storage formats in the Hive codebase. All SerDes must be either registered with Hive + * as a native storage format via {@link org.apache.hadoop.hive.ql.io.StorageFormatDescriptor}, + * enumerated in ADDITIONAL_STORAGE_FORMATS; otherwise, the test will raise a test failure. + * Storage formats that fail HCatalog Core tests or are untested against HCatalog can be marked + * as disabled by being registered in DISABLED_SERDES to skip running tests against them. */ +@RunWith(Parameterized.class) public abstract class HCatMapReduceTest extends HCatBaseTest { - private static final Logger LOG = LoggerFactory.getLogger(HCatMapReduceTest.class); + + /** + * Table of additional storage formats for HCatMapReduceTest. These are SerDes or combinations + * of SerDe with InputFormat and OutputFormat that are not registered as a native Hive storage + * format. + * + * Each row in this table has the following fields: + * - formatName - A string name for the storage format. This is used to give the table created + * for the test a unique name. + * - serdeClass - The name of the SerDe class used by the storage format. + * - inputFormatClass - The name of the InputFormat class. + * - outputFormatClass - The name of the OutputFormat class. + * - storageCustomHandlerClass - The name of the StorageCustomHandler class. See + * {@link org.apache.hadoop.hive.hcatalog.mapreduce.storage.StorageCustomHandler}. + */ + protected static final Object[][] ADDITIONAL_STORAGE_FORMATS = new Object[][] { + { + "rcfile_columnar", + ColumnarSerDe.class.getName(), + RCFileInputFormat.class.getName(), + RCFileOutputFormat.class.getName(), + null, + }, { + "avro", + AvroSerDe.class.getName(), + AvroContainerInputFormat.class.getName(), + AvroContainerOutputFormat.class.getName(), + AvroStorageCustomHandler.class.getName(), + } + }; + + /** + * List of SerDe classes that the HCatalog core tests will not be run against. + */ + protected static final String[] DISABLED_SERDES = new String[] { + AvroSerDe.class.getName(), + ParquetHiveSerDe.class.getName(), + MetadataTypedColumnsetSerDe.class.getName(), + LazyBinarySerDe.class.getName(), + NullStructSerDe.class.getName(), + RegexSerDe.class.getName(), + VectorizedOrcSerde.class.getName(), + ThriftByteStreamTypedSerDe.class.getName(), + ByteStreamTypedSerDe.class.getName(), + VectorizedColumnarSerDe.class.getName(), + DelimitedJSONSerDe.class.getName(), + DynamicSerDe.class.getName(), + BinarySortableSerDe.class.getName(), + TypedSerDe.class.getName(), + }; + protected static String dbName = MetaStoreUtils.DEFAULT_DATABASE_NAME; - protected static String tableName = "testHCatMapReduceTable"; + protected static final String TABLE_NAME = "testHCatMapReduceTable"; private static List writeRecords = new ArrayList(); private static List readRecords = new ArrayList(); - protected abstract List getPartitionKeys(); - - protected abstract List getTableColumns(); - private static FileSystem fs; private String externalTableLocation = null; + protected String tableName; + protected String serdeClass; + protected String inputFormatClass; + protected String outputFormatClass; + protected StorageCustomHandler storageCustomHandler; - protected Boolean isTableExternal() { - return false; + /** + * Create an array of Objects used to populate the test paramters. + * + * @param name Name of the storage format. + * @param serdeClass Name of the SerDe class. + * @param inputFormatClass Name of the InputFormat class. + * @param outputFormatClass Name of the OutputFormat class. + * @return Object array containing the arguments. + */ + protected static Object[] createTestArguments(String name, String serdeClass, + String inputFormatClass, String outputFormatClass) { + return createTestArguments(name, serdeClass, inputFormatClass, outputFormatClass, null); } - protected boolean isTableImmutable() { - return true; + /** + * Create an array of Objects used to populate the test paramters. + * + * @param name Name of the storage format. + * @param serdeClass Name of the SerDe class. + * @param inputFormatClass Name of the InputFormat class. + * @param outputFormatClass Name of the OutputFormat class. + * @param storageCustomHandlerClass Name of the StorageCustomHandler class. + * @return Object array containing the arguments. + */ + protected static Object[] createTestArguments(String name, String serdeClass, + String inputFormatClass, String outputFormatClass, String storageCustomHandlerClass) { + Object[] args = { + name, + serdeClass, + inputFormatClass, + outputFormatClass, + storageCustomHandlerClass, + }; + return args; + } + + /** + * Generate parameters that the test fixture will be run against. + * Each parameter represents one storage format that the fixture will run against. + * First, the native Hive storage formats registered with Hive with + * {@link org.apache.hadoop.hive.ql.io.StorageFormatDescriptor} are added. Then, storage formats + * enumerated in the ADDITIONAL_STORAGE_FORMATS table are added. + * + * Finally, all classes in the classpath that implement the + * {@link org.apache.hadoop.hive.serde2.SerDe} interface are enumerated using reflection. For + * each SerDe, check whether it has added due to being a native storage format or registered + * in ADDITIONAL_STORAGEFORMATS or is disabled by being registered in DISABLED_SERDES. If not, + * then raise a failure. + * + * @return Parameters for the test. + */ + @Parameterized.Parameters + public static Collection generateParameters() { + List parameters = new ArrayList(); + Set testSerdes = new HashSet(); + + // Create set for quick lookup of disabled SerDes. + Set disabledSerdes = new HashSet(); + disabledSerdes.addAll(Arrays.asList(DISABLED_SERDES)); + + // Add test parameters from official storage formats registered with Hive via + // StorageFormatDescriptor. + final Configuration conf = new Configuration(); + for (StorageFormatDescriptor descriptor : ServiceLoader.load(StorageFormatDescriptor.class)) { + String serdeClass = descriptor.getSerde(); + if (serdeClass == null) { + if (descriptor instanceof RCFileStorageFormatDescriptor) { + serdeClass = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEDEFAULTRCFILESERDE); + } else { + serdeClass = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEDEFAULTSERDE); + } + } + + if (disabledSerdes.contains(serdeClass)) { + continue; + } + + String[] names = new String[descriptor.getNames().size()]; + names = descriptor.getNames().toArray(names); + Object[] arguments = createTestArguments(names[0], serdeClass, descriptor.getInputFormat(), + descriptor.getOutputFormat()); + parameters.add(arguments); + testSerdes.add(serdeClass); + } + + // Add test parameters from storage formats specified in ADDITIONAL_STORAGE_FORMATS table. + for (int i = 0; i < ADDITIONAL_STORAGE_FORMATS.length; i++) { + String serdeClass = (String) ADDITIONAL_STORAGE_FORMATS[i][1]; + if (disabledSerdes.contains(serdeClass)) { + continue; + } + String name = (String) ADDITIONAL_STORAGE_FORMATS[i][0]; + String inputFormatClass = (String) ADDITIONAL_STORAGE_FORMATS[i][2]; + String outputFormatClass = (String) ADDITIONAL_STORAGE_FORMATS[i][3]; + String storageCustomHandlerClass = (String) ADDITIONAL_STORAGE_FORMATS[i][4]; + assertTrue("InputFormat for storage format not set", inputFormatClass != null); + assertTrue("OutputFormat for storage format not set", outputFormatClass != null); + Object[] arguments = createTestArguments(name, serdeClass, inputFormatClass, + outputFormatClass, storageCustomHandlerClass); + parameters.add(arguments); + testSerdes.add(serdeClass); + } + + // Verify that there are no SerDes in Hive that are have not been covered. + Reflections reflections = new Reflections("org.apache.hadoop.hive"); + Set> serdes = reflections.getSubTypesOf(SerDe.class); + for (Class serde : serdes) { + // Skip if SerDe class is abstract. For example, AbstractSerDe and ColumnarSerDeBase. + if (Modifier.isAbstract(serde.getModifiers())) { + continue; + } + String serdeClass = serde.getName(); + assertTrue("SerDe " + serdeClass + " has not been added to the HCatMapReduceTest " + + "parameters. Please add " + serdeClass + " to the test parameters.", + testSerdes.contains(serdeClass) || disabledSerdes.contains(serdeClass)); + } + return parameters; } - protected String inputFormat() { - return RCFileInputFormat.class.getName(); + /** + * Test constructor that sets the storage format class names provided by the test parameter. + */ + public HCatMapReduceTest(String name, String serdeClass, String inputFormatClass, + String outputFormatClass, String storageCustomHandlerClass) throws Exception { + this.serdeClass = serdeClass; + this.inputFormatClass = inputFormatClass; + this.outputFormatClass = outputFormatClass; + if (storageCustomHandlerClass != null) { + this.storageCustomHandler = (StorageCustomHandler) Class.forName(storageCustomHandlerClass) + .newInstance(); + } + this.tableName = TABLE_NAME + "_" + name; } - protected String outputFormat() { - return RCFileOutputFormat.class.getName(); + protected abstract List getPartitionKeys(); + + protected abstract List getTableColumns(); + + protected Boolean isTableExternal() { + return false; } - protected String serdeClass() { - return ColumnarSerDe.class.getName(); + protected boolean isTableImmutable() { + return true; } @BeforeClass @@ -144,12 +367,11 @@ public void deleteTable() throws Exception { @Before public void createTable() throws Exception { String databaseName = (dbName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : dbName; - try { client.dropTable(databaseName, tableName); } catch (Exception e) { - } //can fail with NoSuchObjectException - + // Can fail with NoSuchObjectException. + } Table tbl = new Table(); tbl.setDbName(databaseName); @@ -160,10 +382,9 @@ public void createTable() throws Exception { tbl.setTableType(TableType.MANAGED_TABLE.toString()); } StorageDescriptor sd = new StorageDescriptor(); - sd.setCols(getTableColumns()); - tbl.setPartitionKeys(getPartitionKeys()); + tbl.setPartitionKeys(getPartitionKeys()); tbl.setSd(sd); sd.setBucketCols(new ArrayList(2)); @@ -171,12 +392,12 @@ public void createTable() throws Exception { sd.getSerdeInfo().setName(tbl.getTableName()); sd.getSerdeInfo().setParameters(new HashMap()); sd.getSerdeInfo().getParameters().put(serdeConstants.SERIALIZATION_FORMAT, "1"); - if (isTableExternal()){ + if (isTableExternal()) { sd.getSerdeInfo().getParameters().put("EXTERNAL", "TRUE"); } - sd.getSerdeInfo().setSerializationLib(serdeClass()); - sd.setInputFormat(inputFormat()); - sd.setOutputFormat(outputFormat()); + sd.getSerdeInfo().setSerializationLib(serdeClass); + sd.setInputFormat(inputFormatClass); + sd.setOutputFormat(outputFormatClass); Map tableParams = new HashMap(); if (isTableExternal()) { @@ -186,72 +407,66 @@ public void createTable() throws Exception { tableParams.put(hive_metastoreConstants.IS_IMMUTABLE,"true"); } tbl.setParameters(tableParams); + if (storageCustomHandler != null) { + storageCustomHandler.setCustomTableProperties(tbl); + } client.createTable(tbl); } - //Create test input file with specified number of rows + /* + * Create test input file with specified number of rows + */ private void createInputFile(Path path, int rowCount) throws IOException { - if (fs.exists(path)) { fs.delete(path, true); } FSDataOutputStream os = fs.create(path); - for (int i = 0; i < rowCount; i++) { os.writeChars(i + "\n"); } - os.close(); } - public static class MapCreate extends - Mapper { - - static int writeCount = 0; //test will be in local mode + public static class MapCreate extends Mapper { + // Test will be in local mode. + static int writeCount = 0; @Override - public void map(LongWritable key, Text value, Context context - ) throws IOException, InterruptedException { - { - try { - HCatRecord rec = writeRecords.get(writeCount); - context.write(null, rec); - writeCount++; - - } catch (Exception e) { - - e.printStackTrace(System.err); //print since otherwise exception is lost - throw new IOException(e); - } + public void map(LongWritable key, Text value, Context context) + throws IOException, InterruptedException { + try { + HCatRecord rec = writeRecords.get(writeCount); + context.write(null, rec); + writeCount++; + } catch (Exception e) { + // Print since otherwise exception is lost. + e.printStackTrace(System.err); + throw new IOException(e); } } } - public static class MapRead extends - Mapper { - + public static class MapRead extends Mapper { static int readCount = 0; //test will be in local mode @Override - public void map(WritableComparable key, HCatRecord value, Context context - ) throws IOException, InterruptedException { - { - try { - readRecords.add(value); - readCount++; - } catch (Exception e) { - e.printStackTrace(); //print since otherwise exception is lost - throw new IOException(e); - } + public void map(WritableComparable key, HCatRecord value, Context context) + throws IOException, InterruptedException { + try { + readRecords.add(value); + readCount++; + } catch (Exception e) { + // Print since otherwise exception is lost. + e.printStackTrace(); + throw new IOException(e); } } } - Job runMRCreate(Map partitionValues, - List partitionColumns, List records, - int writeCount, boolean assertWrite) throws Exception { + Job runMRCreate(Map partitionValues, List partitionColumns, + List records, int writeCount, boolean assertWrite) throws Exception { return runMRCreate(partitionValues, partitionColumns, records, writeCount, assertWrite, true, null); } @@ -267,10 +482,9 @@ Job runMRCreate(Map partitionValues, * @return * @throws Exception */ - Job runMRCreate(Map partitionValues, - List partitionColumns, List records, - int writeCount, boolean assertWrite, boolean asSingleMapTask, - String customDynamicPathPattern) throws Exception { + Job runMRCreate(Map partitionValues, List partitionColumns, + List records, int writeCount, boolean assertWrite, boolean asSingleMapTask, + String customDynamicPathPattern) throws Exception { writeRecords = records; MapCreate.writeCount = 0; @@ -355,7 +569,6 @@ Job runMRCreate(Map partitionValues, * @throws Exception */ List runMRRead(int readCount, String filter) throws Exception { - MapRead.readCount = 0; readRecords.clear(); @@ -388,9 +601,7 @@ Job runMRCreate(Map partitionValues, return readRecords; } - protected HCatSchema getTableSchema() throws Exception { - Configuration conf = new Configuration(); Job job = new Job(conf, "hcat mapreduce read schema test"); job.setJarByClass(this.getClass()); diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java index 39e9208..2733fbd 100644 --- a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java +++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java @@ -53,9 +53,10 @@ protected static final int NUM_RECORDS = 20; protected static final int NUM_PARTITIONS = 5; - @BeforeClass - public static void generateInputData() throws Exception { - tableName = "testHCatDynamicPartitionedTable"; + public TestHCatDynamicPartitioned(String formatName, String serdeClass, String inputFormatClass, + String outputFormatClass, String storageCustomHandlerClass) throws Exception { + super(formatName, serdeClass, inputFormatClass, outputFormatClass, storageCustomHandlerClass); + tableName = "testHCatDynamicPartitionedTable_" + formatName; generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0); generateDataColumns(); } diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalDynamicPartitioned.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalDynamicPartitioned.java index 0838765..18b7fcf 100644 --- a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalDynamicPartitioned.java +++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalDynamicPartitioned.java @@ -24,18 +24,20 @@ public class TestHCatExternalDynamicPartitioned extends TestHCatDynamicPartitioned { + public TestHCatExternalDynamicPartitioned(String formatName, String serdeClass, + String inputFormatClass, String outputFormatClass, String storageCustomHandlerClass) + throws Exception { + super(formatName, serdeClass, inputFormatClass, outputFormatClass, storageCustomHandlerClass); + tableName = "testHCatExternalDynamicPartitionedTable_" + formatName; + generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0); + generateDataColumns(); + } + @Override protected Boolean isTableExternal() { return true; } - @BeforeClass - public static void generateInputData() throws Exception { - tableName = "testHCatExternalDynamicPartitionedTable"; - generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0); - generateDataColumns(); - } - /** * Run the external dynamic partitioning test but with single map task * @throws Exception diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalNonPartitioned.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalNonPartitioned.java index 01b2ad6..556d9b7 100644 --- a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalNonPartitioned.java +++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalNonPartitioned.java @@ -20,6 +20,11 @@ package org.apache.hive.hcatalog.mapreduce; public class TestHCatExternalNonPartitioned extends TestHCatNonPartitioned { + public TestHCatExternalNonPartitioned(String formatName, String serdeName, + String inputFormatClass, String outputFormatClass, String storageCustomHandlerClass) + throws Exception { + super(formatName, serdeName, inputFormatClass, outputFormatClass, storageCustomHandlerClass); + } @Override protected Boolean isTableExternal() { diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalPartitioned.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalPartitioned.java index e5f8d1e..eabb94a 100644 --- a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalPartitioned.java +++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalPartitioned.java @@ -20,6 +20,11 @@ package org.apache.hive.hcatalog.mapreduce; public class TestHCatExternalPartitioned extends TestHCatPartitioned { + public TestHCatExternalPartitioned(String formatName, String serdeClass, + String inputFormatClass, String outputFormatClass, String storageCustomHandlerClass) + throws Exception { + super(formatName, serdeClass, inputFormatClass, outputFormatClass, storageCustomHandlerClass); + } @Override protected Boolean isTableExternal() { diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutableDynamicPartitioned.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutableDynamicPartitioned.java index bfc6a4f..84dc005 100644 --- a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutableDynamicPartitioned.java +++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutableDynamicPartitioned.java @@ -20,6 +20,11 @@ package org.apache.hive.hcatalog.mapreduce; public class TestHCatMutableDynamicPartitioned extends TestHCatDynamicPartitioned { + public TestHCatMutableDynamicPartitioned(String formatName, String serdeClass, + String inputFormatClass, String outputFormatClass, String storageCustomHandlerClass) + throws Exception { + super(formatName, serdeClass, inputFormatClass, outputFormatClass, storageCustomHandlerClass); + } @Override protected boolean isTableImmutable() { diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutableNonPartitioned.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutableNonPartitioned.java index a944023..3590fc6 100644 --- a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutableNonPartitioned.java +++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutableNonPartitioned.java @@ -20,7 +20,11 @@ package org.apache.hive.hcatalog.mapreduce; public class TestHCatMutableNonPartitioned extends TestHCatNonPartitioned { - + public TestHCatMutableNonPartitioned(String formatName, String serdeClass, + String inputFormatClass, String outputFormatClass, String storageCustomHandlerClass) + throws Exception { + super(formatName, serdeClass, inputFormatClass, outputFormatClass, storageCustomHandlerClass); + } @Override protected boolean isTableImmutable() { diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutablePartitioned.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutablePartitioned.java index 73d6e80..e9a45e0 100644 --- a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutablePartitioned.java +++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutablePartitioned.java @@ -20,6 +20,11 @@ package org.apache.hive.hcatalog.mapreduce; public class TestHCatMutablePartitioned extends TestHCatPartitioned { + public TestHCatMutablePartitioned(String formatName, String serdeClass, + String inputFormatClass, String outputFormatClass, String storageCustomHandlerClass) + throws Exception { + super(formatName, serdeClass, inputFormatClass, outputFormatClass, storageCustomHandlerClass); + } @Override protected boolean isTableImmutable() { diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatNonPartitioned.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatNonPartitioned.java index d639e99..6c04710 100644 --- a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatNonPartitioned.java +++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatNonPartitioned.java @@ -43,16 +43,14 @@ import static org.junit.Assert.assertNull; public class TestHCatNonPartitioned extends HCatMapReduceTest { - private static List writeRecords; static List partitionColumns; - @BeforeClass - public static void oneTimeSetUp() throws Exception { - + public TestHCatNonPartitioned(String formatName, String serdeClass, String inputFormatClass, + String outputFormatClass, String storageCustomHandlerClass) throws Exception { + super(formatName, serdeClass, inputFormatClass, outputFormatClass, storageCustomHandlerClass); dbName = null; //test if null dbName works ("default" is used) - tableName = "testHCatNonPartitionedTable"; - + tableName = "testHCatNonPartitionedTable_" + formatName; writeRecords = new ArrayList(); for (int i = 0; i < 20; i++) { diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatPartitioned.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatPartitioned.java index dc05f43..033b613 100644 --- a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatPartitioned.java +++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatPartitioned.java @@ -49,10 +49,10 @@ private static List writeRecords; private static List partitionColumns; - @BeforeClass - public static void oneTimeSetUp() throws Exception { - - tableName = "testHCatPartitionedTable"; + public TestHCatPartitioned(String formatName, String serdeClass, String inputFormatClass, + String outputFormatClass, String storageCustomHandlerClass) throws Exception { + super(formatName, serdeClass, inputFormatClass, outputFormatClass, storageCustomHandlerClass); + tableName = "testHCatPartitionedTable_" + formatName; writeRecords = new ArrayList(); for (int i = 0; i < 20; i++) { @@ -68,7 +68,6 @@ public static void oneTimeSetUp() throws Exception { partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", serdeConstants.STRING_TYPE_NAME, ""))); } - @Override protected List getPartitionKeys() { List fields = new ArrayList(); diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/storage/AvroStorageCustomHandler.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/storage/AvroStorageCustomHandler.java new file mode 100644 index 0000000..e2af6a5 --- /dev/null +++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/storage/AvroStorageCustomHandler.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.hcatalog.mapreduce.storage; + +import com.google.common.io.Resources; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.avro.Schema; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.serde2.avro.TypeInfoToSchema; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** + * Custom logic for running tests against the Avro storage format. + */ +public class AvroStorageCustomHandler extends StorageCustomHandler { + /** + * Takes the schema of the table and converts it to an Avro schema in order to set the required + * `avro.schema.literal` property of the table. + * + * @param table The table object. + */ + @Override + public void setCustomTableProperties(Table table) { + StorageDescriptor sd = table.getSd(); + List cols = sd.getCols(); + List columnNames = new ArrayList(); + List columnTypes = new ArrayList(); + for (FieldSchema col : cols) { + columnNames.add(col.getName()); + columnTypes.add(TypeInfoUtils.getTypeInfoFromTypeString(col.getType())); + } + Schema avroSchema = TypeInfoToSchema.convert(columnTypes, columnNames); + + Map tableParams = new HashMap(); + tableParams.put("avro.schema.literal", avroSchema.toString()); + table.setParameters(tableParams); + } +} + diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/storage/StorageCustomHandler.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/storage/StorageCustomHandler.java new file mode 100644 index 0000000..36f5592 --- /dev/null +++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/storage/StorageCustomHandler.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.hcatalog.mapreduce.storage; + +import org.apache.hadoop.hive.metastore.api.Table; + +/** + * Class used to handle custom logic, such as setting custom table properties, + * when running HCatalog core tests against a storage format. + */ +public class StorageCustomHandler { + /** + * Sets custom table properties. By default, this method is an no-op. + * + * @param table The table object. + */ + public void setCustomTableProperties(Table table) { + } +} diff --git a/ql/pom.xml b/ql/pom.xml index 3ed8e57..f215262 100644 --- a/ql/pom.xml +++ b/ql/pom.xml @@ -444,6 +444,14 @@ + org.apache.maven.plugins + maven-surefire-plugin + 2.4.2 + + true + + + org.antlr antlr3-maven-plugin diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/IOConstants.java b/ql/src/java/org/apache/hadoop/hive/ql/io/IOConstants.java index 1bae0a8..9add316 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/IOConstants.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/IOConstants.java @@ -33,6 +33,7 @@ public static final String ORCFILE = "ORCFILE"; public static final String PARQUET = "PARQUET"; public static final String PARQUETFILE = "PARQUETFILE"; + public static final String AVRO = "AVRO"; @VisibleForTesting public static final String CUSTOM_TEXT_SERDE = "CustomTextSerde"; diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/avro/TypeInfoToSchema.java b/serde/src/java/org/apache/hadoop/hive/serde2/avro/TypeInfoToSchema.java new file mode 100644 index 0000000..68c6176 --- /dev/null +++ b/serde/src/java/org/apache/hadoop/hive/serde2/avro/TypeInfoToSchema.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.serde2.avro; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.avro.Schema; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; + +public class TypeInfoToSchema { + /** + * Converts a list of Hive TypeInfo and column names into an Avro Schema. + * + * @param columnTypes + * @param columnNames + * @return Avro schema + */ + public static Schema convert(List columnTypes, List columnNames) { + List fields = new ArrayList(); + for (int i = 0; i < columnTypes.size(); i++) { + final Schema fieldSchema = convert(columnTypes.get(i), columnNames.get(i)); + fields.add(new Schema.Field(columnNames.get(i), fieldSchema, null, null)); + } + Schema recordSchema = Schema.createRecord("schema", null, null, false); + recordSchema.setFields(fields); + return recordSchema; + } + + private static Schema convert(TypeInfo typeInfo, String fieldName) { + final Category typeCategory = typeInfo.getCategory(); + switch (typeCategory) { + case PRIMITIVE: + return convertPrimitive(fieldName, (PrimitiveTypeInfo) typeInfo); + case LIST: + return convertList(fieldName, (ListTypeInfo) typeInfo); + case STRUCT: + return convertStruct(fieldName, (StructTypeInfo) typeInfo); + case MAP: + return convertMap(fieldName, (MapTypeInfo) typeInfo); + default: + throw new TypeNotPresentException(typeInfo.getTypeName(), null); + } + } + + private static Schema convertPrimitive(String fieldName, PrimitiveTypeInfo typeInfo) { + if (typeInfo.equals(TypeInfoFactory.intTypeInfo) + || typeInfo.equals(TypeInfoFactory.shortTypeInfo) + || typeInfo.equals(TypeInfoFactory.byteTypeInfo)) { + return Schema.create(Schema.Type.INT); + } else if (typeInfo.equals(TypeInfoFactory.longTypeInfo)) { + return Schema.create(Schema.Type.LONG); + } else if (typeInfo.equals(TypeInfoFactory.floatTypeInfo)) { + return Schema.create(Schema.Type.FLOAT); + } else if (typeInfo.equals(TypeInfoFactory.doubleTypeInfo)) { + return Schema.create(Schema.Type.DOUBLE); + } else if (typeInfo.equals(TypeInfoFactory.booleanTypeInfo)) { + return Schema.create(Schema.Type.BOOLEAN); + } else if (typeInfo.equals(TypeInfoFactory.stringTypeInfo)) { + return Schema.create(Schema.Type.STRING); + } else if (typeInfo.equals(TypeInfoFactory.binaryTypeInfo)) { + return Schema.create(Schema.Type.BYTES); + } else if (typeInfo.equals(TypeInfoFactory.decimalTypeInfo)) { + throw new RuntimeException("Decimal type not supported."); + } else if (typeInfo.equals(TypeInfoFactory.timestampTypeInfo)) { + throw new RuntimeException("Timestamp type not supported."); + } else if (typeInfo.equals(TypeInfoFactory.dateTypeInfo)) { + throw new RuntimeException("Date type not supported."); + } else if (typeInfo.equals(TypeInfoFactory.varcharTypeInfo)) { + throw new RuntimeException("Varchar type not supported."); + } else if (typeInfo.equals(TypeInfoFactory.unknownTypeInfo)) { + throw new RuntimeException("Unknown type not supported."); + } else { + throw new RuntimeException("Unknown type: " + typeInfo); + } + } + + private static Schema convertList(String fieldName, ListTypeInfo typeInfo) { + final TypeInfo elementTypeInfo = typeInfo.getListElementTypeInfo(); + final Schema elementType = convert(elementTypeInfo, "array_element"); + return Schema.createArray(elementType); + } + + private static Schema convertStruct(String fieldName, StructTypeInfo typeInfo) { + final List columnNames = typeInfo.getAllStructFieldNames(); + final List columnTypeInfos = typeInfo.getAllStructFieldTypeInfos(); + List fields = new ArrayList(); + for (int i = 0; i < columnNames.size(); i++) { + final String columnFieldName = columnNames.get(i); + final Schema fieldSchema = convert(columnTypeInfos.get(i), columnFieldName); + fields.add(new Schema.Field(columnFieldName, fieldSchema, null, null)); + } + final Schema recordSchema = Schema.createRecord(fieldName, null, null, false); + recordSchema.setFields(fields); + return recordSchema; + } + + private static Schema convertMap(String fieldName, MapTypeInfo typeInfo) { + final TypeInfo keyTypeInfo = typeInfo.getMapKeyTypeInfo(); + if (!keyTypeInfo.equals(TypeInfoFactory.stringTypeInfo)) { + throw new RuntimeException("Avro does not support maps with key type: " + keyTypeInfo); + } + final Schema valueType = convert(typeInfo.getMapValueTypeInfo(), "value"); + return Schema.createMap(valueType); + } +} diff --git a/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestTypeInfoToSchema.java b/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestTypeInfoToSchema.java new file mode 100644 index 0000000..664d0e0 --- /dev/null +++ b/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestTypeInfoToSchema.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.serde2.avro; + +import com.google.common.io.Resources; + +import java.util.Arrays; +import java.util.List; + +import org.apache.avro.Schema; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class TestTypeInfoToSchema { + @Test + public void testAllTypes() throws Exception { + String fieldNames = + "myboolean," + + "myint," + + "mylong," + + "myfloat," + + "mydouble," + + "mybytes," + + "mystring," + + "myrecord," + + "myarray," + + "mymap"; + String fieldTypes = + "boolean," + + "int," + + "bigint," + + "float," + + "double," + + "binary," + + "string," + + "struct," + + "array," + + "map"; + + List columnNames = Arrays.asList(fieldNames.split(",")); + List columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(fieldTypes); + + assertEquals(columnNames.size(), columnTypes.size()); + + Schema avroSchema = TypeInfoToSchema.convert(columnTypes, columnNames); + Schema expectedSchema = new Schema.Parser().parse( + Resources.getResource("alltypes.avsc").openStream()); + + assertEquals("Expected: " + expectedSchema.toString(true) + + "\nGot: " + avroSchema.toString(true), + expectedSchema.toString(), avroSchema.toString()); + } +} diff --git a/serde/src/test/resources/alltypes.avsc b/serde/src/test/resources/alltypes.avsc new file mode 100644 index 0000000..39be023 --- /dev/null +++ b/serde/src/test/resources/alltypes.avsc @@ -0,0 +1,48 @@ +{ + "type" : "record", + "name" : "schema", + "fields" : [ { + "name" : "myboolean", + "type" : "boolean" + }, { + "name" : "myint", + "type" : "int" + }, { + "name" : "mylong", + "type" : "long" + }, { + "name" : "myfloat", + "type" : "float" + }, { + "name" : "mydouble", + "type" : "double" + }, { + "name" : "mybytes", + "type" : "bytes" + }, { + "name" : "mystring", + "type" : "string" + }, { + "name" : "myrecord", + "type" : { + "type" : "record", + "name" : "myrecord", + "fields" : [ { + "name" : "mynestedint", + "type" : "int" + } ] + } + }, { + "name" : "myarray", + "type" : { + "type" : "array", + "items" : "int" + } + }, { + "name" : "mymap", + "type" : { + "type" : "map", + "values" : "int" + } + } ] +}