diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/SpecialCases.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/SpecialCases.java index fb885a3..f38d53b 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/SpecialCases.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/SpecialCases.java @@ -23,11 +23,20 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.io.RCFileOutputFormat; +import org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat; import org.apache.hadoop.hive.ql.io.orc.OrcFile; import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat; +import org.apache.hadoop.hive.serde2.avro.AvroSerDe; +import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; +import java.util.ArrayList; +import java.util.List; import java.util.Map; +import java.util.Properties; /** * This class is a place to put all the code associated with @@ -82,6 +91,35 @@ public static void addSpecialCasesParametersToOutputJobProperties( jobProperties.put(propName,tableProps.get(propName)); } } + } else if (ofclass == AvroContainerOutputFormat.class) { + // Special cases for Avro. As with ORC, we make table properties that + // Avro is interested in available in jobconf at runtime + Map tableProps = jobInfo.getTableInfo().getTable().getParameters(); + for (AvroSerdeUtils.AvroTableProperties property : AvroSerdeUtils.AvroTableProperties.values()) { + String propName = property.getPropName(); + if (tableProps.containsKey(propName)){ + String propVal = tableProps.get(propName); + jobProperties.put(propName,tableProps.get(propName)); + } + } + + Properties properties = new Properties(); + properties.put("name",jobInfo.getTableName()); + + List colNames = jobInfo.getOutputSchema().getFieldNames(); + List colTypes = new ArrayList(); + for (HCatFieldSchema field : jobInfo.getOutputSchema().getFields()){ + colTypes.add(TypeInfoUtils.getTypeInfoFromTypeString(field.getTypeString())); + } + + jobProperties.put(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), + AvroSerDe.getSchemaFromCols(properties, colNames, colTypes, null).toString()); + + + for (String propName : jobProperties.keySet()){ + String propVal = jobProperties.get(propName); + } + } } diff --git a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java index 5eabba1..f97e511 100644 --- a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java +++ b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java @@ -101,12 +101,6 @@ private static final Map> DISABLED_STORAGE_FORMATS = new HashMap>() {{ - put(IOConstants.AVRO, new HashSet() {{ - add("testReadDataBasic"); - add("testReadPartitionedBasic"); - add("testProjectionsBasic"); - add("testSchemaLoadPrimitiveTypes"); - }}); put(IOConstants.PARQUETFILE, new HashSet() {{ add("testReadDataBasic"); add("testReadPartitionedBasic"); diff --git a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatStorer.java b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatStorer.java index a380f61..86d705c 100644 --- a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatStorer.java +++ b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatStorer.java @@ -76,29 +76,16 @@ private static final Map> DISABLED_STORAGE_FORMATS = new HashMap>() {{ put(IOConstants.AVRO, new HashSet() {{ - add("testBagNStruct"); - add("testDateCharTypes"); - add("testDynamicPartitioningMultiPartColsInDataNoSpec"); - add("testDynamicPartitioningMultiPartColsInDataPartialSpec"); - add("testMultiPartColsInData"); - add("testPartColsInData"); - add("testStoreFuncAllSimpleTypes"); - add("testStoreFuncSimple"); - add("testStoreInPartiitonedTbl"); - add("testStoreMultiTables"); - add("testStoreWithNoCtorArgs"); - add("testStoreWithNoSchema"); - add("testWriteChar"); - add("testWriteDate"); - add("testWriteDate2"); - add("testWriteDate3"); - add("testWriteDecimal"); - add("testWriteDecimalX"); - add("testWriteDecimalXY"); - add("testWriteSmallint"); - add("testWriteTimestamp"); - add("testWriteTinyint"); - add("testWriteVarchar"); + add("testDateCharTypes"); // incorrect precision + // expected:<0 xxxxx yyy 5.2[]> but was:<0 xxxxx yyy 5.2[0]> + add("testWriteDecimalXY"); // incorrect precision + // expected:<1.2[]> but was:<1.2[0]> + add("testWriteSmallint"); // doesn't have a notion of small, and saves the full value as an int, so no overflow + // expected: but was:<32768> + add("testWriteTimestamp"); // does not support timestamp + // TypeInfoToSchema.createAvroPrimitive : UnsupportedOperationException + add("testWriteTinyint"); // doesn't have a notion of tiny, and saves the full value as an int, so no overflow + // expected: but was:<300> }}); put(IOConstants.PARQUETFILE, new HashSet() {{ add("testBagNStruct"); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java index fba0f96..3985cd6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java @@ -30,14 +30,17 @@ import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable; import org.apache.hadoop.hive.serde2.avro.AvroSerdeException; import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils; -import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordWriter; import org.apache.hadoop.mapred.Reporter; @@ -47,7 +50,9 @@ * Write to an Avro file from a Hive process. */ public class AvroContainerOutputFormat - implements HiveOutputFormat { + implements HiveOutputFormat { + + public static final Log LOG = LogFactory.getLog(AvroContainerOutputFormat.class); @Override public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jobConf, @@ -75,21 +80,62 @@ return new AvroGenericRecordWriter(dfw); } - //no records will be emitted from Hive - @Override - public RecordWriter - getRecordWriter(FileSystem ignored, JobConf job, String name, - Progressable progress) { - return new RecordWriter() { - @Override - public void write(LongWritable key, AvroGenericRecordWritable value) { - throw new RuntimeException("Should not be called"); - } + class WrapperRecordWriter implements RecordWriter { + FileSinkOperator.RecordWriter hiveWriter = null; + JobConf jobConf; + Progressable progressable; + String fileName; - @Override - public void close(Reporter reporter) { + public WrapperRecordWriter(JobConf jobConf, Progressable progressable, String fileName){ + this.progressable = progressable; + this.jobConf = jobConf; + this.fileName = fileName; + } + + private FileSinkOperator.RecordWriter getHiveWriter() throws IOException { + if (this.hiveWriter == null){ + Properties properties = new Properties(); + for (AvroSerdeUtils.AvroTableProperties tableProperty : AvroSerdeUtils.AvroTableProperties.values()){ + String propVal; + if((propVal = jobConf.get(tableProperty.getPropName())) != null){ + properties.put(tableProperty.getPropName(),propVal); + } + } + + Boolean isCompressed = jobConf.getBoolean("mapreduce.output.fileoutputformat.compress", false); + Path path = new Path(this.fileName); + if(path.getFileSystem(jobConf).isDirectory(path)){ + // This path is only potentially encountered during setup + // Otherwise, a specific part_xxxx file name is generated and passed in. + path = new Path(path,"_dummy"); + } + + this.hiveWriter = getHiveRecordWriter(jobConf,path,null,isCompressed, properties, progressable); } - }; + return this.hiveWriter; + } + + @Override + public void write(K key, V value) throws IOException { + getHiveWriter().write(value); + } + + @Override + public void close(Reporter reporter) throws IOException { + // Normally, I'd worry about the blanket false being passed in here, and that + // it'd need to be integrated into an abort call for an OutputCommitter, but the + // underlying recordwriter ignores it and throws it away, so it's irrelevant. + getHiveWriter().close(false); + } + + } + + //no records will be emitted from Hive + @Override + public RecordWriter + getRecordWriter(FileSystem ignored, JobConf job, String fileName, + Progressable progress) throws IOException { + return new WrapperRecordWriter(job,progress,fileName); } @Override diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java index 9695346..8ee12c5 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java @@ -92,10 +92,10 @@ public void initialize(Configuration configuration, Properties properties) throw final String columnNameProperty = properties.getProperty(serdeConstants.LIST_COLUMNS); final String columnTypeProperty = properties.getProperty(serdeConstants.LIST_COLUMN_TYPES); - final String columnCommentProperty = properties.getProperty(LIST_COLUMN_COMMENTS); + final String columnCommentProperty = properties.getProperty(LIST_COLUMN_COMMENTS,""); - if (properties.getProperty(AvroSerdeUtils.SCHEMA_LITERAL) != null - || properties.getProperty(AvroSerdeUtils.SCHEMA_URL) != null + if (properties.getProperty(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName()) != null + || properties.getProperty(AvroSerdeUtils.AvroTableProperties.SCHEMA_URL.getPropName()) != null || columnNameProperty == null || columnNameProperty.isEmpty() || columnTypeProperty == null || columnTypeProperty.isEmpty()) { schema = determineSchemaOrReturnErrorSchema(properties); @@ -104,28 +104,7 @@ public void initialize(Configuration configuration, Properties properties) throw columnNames = Arrays.asList(columnNameProperty.split(",")); columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); - List columnComments; - if (columnCommentProperty.isEmpty()) { - columnComments = new ArrayList(); - } else { - columnComments = Arrays.asList(columnCommentProperty.split(",")); - LOG.info("columnComments is " + columnCommentProperty); - } - if (columnNames.size() != columnTypes.size()) { - throw new IllegalArgumentException("AvroSerde initialization failed. Number of column " + - "name and column type differs. columnNames = " + columnNames + ", columnTypes = " + - columnTypes); - } - - final String tableName = properties.getProperty(TABLE_NAME); - final String tableComment = properties.getProperty(TABLE_COMMENT); - TypeInfoToSchema typeInfoToSchema = new TypeInfoToSchema(); - schema = typeInfoToSchema.convert(columnNames, columnTypes, columnComments, - properties.getProperty(AvroSerdeUtils.SCHEMA_NAMESPACE), - properties.getProperty(AvroSerdeUtils.SCHEMA_NAME, tableName), - properties.getProperty(AvroSerdeUtils.SCHEMA_DOC, tableComment)); - - properties.setProperty(AvroSerdeUtils.SCHEMA_LITERAL, schema.toString()); + schema = getSchemaFromCols(properties, columnNames, columnTypes, columnCommentProperty); } LOG.info("Avro schema is " + schema); @@ -133,7 +112,8 @@ public void initialize(Configuration configuration, Properties properties) throw if (configuration == null) { LOG.info("Configuration null, not inserting schema"); } else { - configuration.set(AvroSerdeUtils.AVRO_SERDE_SCHEMA, schema.toString(false)); + configuration.set( + AvroSerdeUtils.AvroTableProperties.AVRO_SERDE_SCHEMA.getPropName(), schema.toString(false)); } badSchema = schema.equals(SchemaResolutionProblem.SIGNAL_BAD_SCHEMA); @@ -144,6 +124,31 @@ public void initialize(Configuration configuration, Properties properties) throw this.oi = aoig.getObjectInspector(); } + public static Schema getSchemaFromCols(Properties properties, + List columnNames, List columnTypes, String columnCommentProperty) { + List columnComments; + if (columnCommentProperty == null || columnCommentProperty.isEmpty()) { + columnComments = new ArrayList(); + } else { + columnComments = Arrays.asList(columnCommentProperty.split(",")); + LOG.info("columnComments is " + columnCommentProperty); + } + if (columnNames.size() != columnTypes.size()) { + throw new IllegalArgumentException("AvroSerde initialization failed. Number of column " + + "name and column type differs. columnNames = " + columnNames + ", columnTypes = " + + columnTypes); + } + + final String tableName = properties.getProperty(TABLE_NAME); + final String tableComment = properties.getProperty(TABLE_COMMENT); + TypeInfoToSchema typeInfoToSchema = new TypeInfoToSchema(); + return typeInfoToSchema.convert(columnNames, columnTypes, columnComments, + properties.getProperty(AvroSerdeUtils.AvroTableProperties.SCHEMA_NAMESPACE.getPropName()), + properties.getProperty(AvroSerdeUtils.AvroTableProperties.SCHEMA_NAME.getPropName(), tableName), + properties.getProperty(AvroSerdeUtils.AvroTableProperties.SCHEMA_DOC.getPropName(), tableComment)); + + } + /** * Attempt to determine the schema via the usual means, but do not throw * an exception if we fail. Instead, signal failure via a special diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java index c0f054f..fa9222a 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java @@ -48,16 +48,47 @@ public class AvroSerdeUtils { private static final Log LOG = LogFactory.getLog(AvroSerdeUtils.class); - public static final String SCHEMA_LITERAL = "avro.schema.literal"; - public static final String SCHEMA_URL = "avro.schema.url"; + /** + * Enum container for all avro table properties. + * If introducing a new avro-specific table property, + * add it here. Putting them in an enum rather than separate strings + * allows them to be programmatically grouped and referenced together. + */ + public static enum AvroTableProperties { + SCHEMA_LITERAL("avro.schema.literal"), + SCHEMA_URL("avro.schema.url"), + SCHEMA_NAMESPACE("avro.schema.namespace"), + SCHEMA_NAME("avro.schema.name"), + SCHEMA_DOC("avro.schema.doc"), + AVRO_SERDE_SCHEMA("avro.serde.schema"), + SCHEMA_RETRIEVER("avro.schema.retriever"); + + private final String propName; + + AvroTableProperties(String propName) { + this.propName = propName; + } + + public String getPropName(){ + return this.propName; + } + } + + // Following parameters slated for removal, prefer usage of enum above, that allows programmatic access. + @Deprecated public static final String SCHEMA_LITERAL = "avro.schema.literal"; + @Deprecated public static final String SCHEMA_URL = "avro.schema.url"; + @Deprecated public static final String SCHEMA_NAMESPACE = "avro.schema.namespace"; + @Deprecated public static final String SCHEMA_NAME = "avro.schema.name"; + @Deprecated public static final String SCHEMA_DOC = "avro.schema.doc"; + @Deprecated public static final String AVRO_SERDE_SCHEMA = AvroTableProperties.AVRO_SERDE_SCHEMA.getPropName(); + @Deprecated public static final String SCHEMA_RETRIEVER = AvroTableProperties.SCHEMA_RETRIEVER.getPropName(); + public static final String SCHEMA_NONE = "none"; - public static final String SCHEMA_NAMESPACE = "avro.schema.namespace"; - public static final String SCHEMA_NAME = "avro.schema.name"; - public static final String SCHEMA_DOC = "avro.schema.doc"; - public static final String EXCEPTION_MESSAGE = "Neither " + SCHEMA_LITERAL + " nor " - + SCHEMA_URL + " specified, can't determine table schema"; - public static final String AVRO_SERDE_SCHEMA = "avro.serde.schema"; - public static final String SCHEMA_RETRIEVER = "avro.schema.retriever"; + public static final String EXCEPTION_MESSAGE = "Neither " + + AvroTableProperties.SCHEMA_LITERAL.getPropName() + " nor " + + AvroTableProperties.SCHEMA_URL.getPropName() + " specified, can't determine table schema"; + + /** * Determine the schema to that's been provided for Avro serde work. @@ -68,12 +99,12 @@ */ public static Schema determineSchemaOrThrowException(Properties properties) throws IOException, AvroSerdeException { - String schemaString = properties.getProperty(SCHEMA_LITERAL); + String schemaString = properties.getProperty(AvroTableProperties.SCHEMA_LITERAL.getPropName()); if(schemaString != null && !schemaString.equals(SCHEMA_NONE)) return AvroSerdeUtils.getSchemaFor(schemaString); // Try pulling directly from URL - schemaString = properties.getProperty(SCHEMA_URL); + schemaString = properties.getProperty(AvroTableProperties.SCHEMA_URL.getPropName()); if(schemaString == null || schemaString.equals(SCHEMA_NONE)) throw new AvroSerdeException(EXCEPTION_MESSAGE);