diff --git build.xml build.xml
index d027e6a..6323352 100644
--- build.xml
+++ build.xml
@@ -141,7 +141,7 @@
-
+
@@ -152,7 +152,7 @@
-
+
diff --git serde/ivy.xml serde/ivy.xml
index 88a6da8..328baa4 100644
--- serde/ivy.xml
+++ serde/ivy.xml
@@ -45,5 +45,9 @@
transitive="false"/>
+
+
diff --git serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroContainerInputFormat.java serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroContainerInputFormat.java
new file mode 100644
index 0000000..ee9c0fd
--- /dev/null
+++ serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroContainerInputFormat.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.serde2.avro;
+
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class AvroContainerInputFormat extends FileInputFormat implements JobConfigurable {
+ protected JobConf jobConf;
+
+ @Override
+ protected FileStatus[] listStatus(JobConf job) throws IOException {
+ List result = new ArrayList();
+ for (FileStatus file : super.listStatus(job))
+ // TODO: How to have output files end with .avro?
+ //if (file.getPath().getName().endsWith(".avro"))
+ result.add(file);
+ return result.toArray(new FileStatus[0]);
+ }
+
+ @Override
+ public RecordReader getRecordReader(InputSplit inputSplit, JobConf jc, Reporter reporter) throws IOException {
+ return new AvroGenericRecordReader(jc, (FileSplit) inputSplit, reporter);
+ }
+
+ @Override
+ public void configure(JobConf jobConf) {
+ this.jobConf = jobConf;
+ }
+}
diff --git serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroContainerOutputFormat.java serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroContainerOutputFormat.java
new file mode 100644
index 0000000..c4192c5
--- /dev/null
+++ serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroContainerOutputFormat.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.serde2.avro;
+
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Progressable;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import static org.apache.avro.file.DataFileConstants.DEFLATE_CODEC;
+import static org.apache.avro.mapred.AvroJob.OUTPUT_CODEC;
+import static org.apache.avro.mapred.AvroOutputFormat.DEFAULT_DEFLATE_LEVEL;
+import static org.apache.avro.mapred.AvroOutputFormat.DEFLATE_LEVEL_KEY;
+
+/**
+ * Write to an Avro file from a Hive process.
+ */
+public class AvroContainerOutputFormat implements HiveOutputFormat {
+
+ @Override
+ public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jobConf,
+ Path path,
+ Class extends Writable> valueClass,
+ boolean isCompressed,
+ Properties properties,
+ Progressable progressable) throws IOException {
+ Schema schema;
+ try {
+ schema = AvroSerdeUtils.determineSchemaOrThrowException(properties);
+ } catch (AvroSerdeException e) {
+ throw new IOException(e);
+ }
+ GenericDatumWriter gdw = new GenericDatumWriter(schema);
+ DataFileWriter dfw = new DataFileWriter(gdw);
+
+ if (isCompressed) {
+ int level = jobConf.getInt(DEFLATE_LEVEL_KEY, DEFAULT_DEFLATE_LEVEL);
+ String codecName = jobConf.get(OUTPUT_CODEC, DEFLATE_CODEC);
+ CodecFactory factory = codecName.equals(DEFLATE_CODEC)
+ ? CodecFactory.deflateCodec(level)
+ : CodecFactory.fromString(codecName);
+ dfw.setCodec(factory);
+ }
+
+ dfw.create(schema, path.getFileSystem(jobConf).create(path));
+ return new AvroGenericRecordWriter(dfw);
+ }
+
+ //no records will be emitted from Hive
+ @Override
+ public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, String name,
+ Progressable progress) {
+ return new RecordWriter() {
+ public void write(LongWritable key, AvroGenericRecordWritable value) {
+ throw new RuntimeException("Should not be called");
+ }
+
+ public void close(Reporter reporter) {
+ }
+ };
+ }
+
+ @Override
+ public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException {
+ return; // Not doing any check
+ }
+}
diff --git serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java
new file mode 100644
index 0000000..4131718
--- /dev/null
+++ serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.serde2.avro;
+
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.util.Utf8;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.*;
+import org.apache.hadoop.io.Writable;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+
+class AvroDeserializer {
+ private static final Log LOG = LogFactory.getLog(AvroDeserializer.class);
+ /**
+ * When encountering a record with an older schema than the one we're trying
+ * to read, it is necessary to re-encode with a reader against the newer schema.
+ * Because Hive doesn't provide a way to pass extra information to the
+ * inputformat, we're unable to provide the newer schema when we have it and it
+ * would be most useful - when the inputformat is reading the file.
+ *
+ * This is a slow process, so we try to cache as many of the objects as possible.
+ */
+ static class SchemaReEncoder {
+ private final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ private final GenericDatumWriter gdw = new GenericDatumWriter();
+ private BinaryDecoder binaryDecoder = null;
+ private InstanceCache> gdrCache
+ = new InstanceCache>() {
+ @Override
+ protected GenericDatumReader makeInstance(ReaderWriterSchemaPair hv) {
+ return new GenericDatumReader(hv.getWriter(), hv.getReader());
+ }
+ };
+
+ public GenericRecord reencode(GenericRecord r, Schema readerSchema) throws AvroSerdeException {
+ baos.reset();
+
+ BinaryEncoder be = EncoderFactory.get().directBinaryEncoder(baos, null);
+ gdw.setSchema(r.getSchema());
+ try {
+ gdw.write(r, be);
+ ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+
+ binaryDecoder = DecoderFactory.defaultFactory().createBinaryDecoder(bais, binaryDecoder);
+
+ ReaderWriterSchemaPair pair = new ReaderWriterSchemaPair(r.getSchema(), readerSchema);
+ GenericDatumReader gdr = gdrCache.retrieve(pair);
+ return gdr.read(r, binaryDecoder);
+
+ } catch (IOException e) {
+ throw new AvroSerdeException("Exception trying to re-encode record to new schema", e);
+ }
+ }
+ }
+
+ private List row;
+ private SchemaReEncoder reEncoder;
+
+ /**
+ * Deserialize an Avro record, recursing into its component fields and
+ * deserializing them as well. Fields of the record are matched by name
+ * against fields in the Hive row.
+ *
+ * Because Avro has some data types that Hive does not, these are converted
+ * during deserialization to types Hive will work with.
+ *
+ * @param columnNames List of columns Hive is expecting from record.
+ * @param columnTypes List of column types matched by index to names
+ * @param writable Instance of GenericAvroWritable to deserialize
+ * @param readerSchema Schema of the writable to deserialize
+ * @return A list of objects suitable for Hive to work with further
+ * @throws AvroSerdeException For any exception during deseriliazation
+ */
+ public Object deserialize(List columnNames, List columnTypes, Writable writable, Schema readerSchema) throws AvroSerdeException {
+ if(!(writable instanceof AvroGenericRecordWritable))
+ throw new AvroSerdeException("Expecting a AvroGenericRecordWritable");
+
+ if(row == null || row.size() != columnNames.size())
+ row = new ArrayList(columnNames.size());
+ else
+ row.clear();
+
+ AvroGenericRecordWritable recordWritable = (AvroGenericRecordWritable) writable;
+ GenericRecord r = recordWritable.getRecord();
+
+ // Check if we're working with an evolved schema
+ if(!r.getSchema().equals(readerSchema)) {
+ LOG.warn("Received different schemas. Have to re-encode: " + r.getSchema().toString(false));
+ if(reEncoder == null) reEncoder = new SchemaReEncoder();
+ r = reEncoder.reencode(r, readerSchema);
+ }
+
+ workerBase(row, columnNames, columnTypes, r);
+ return row;
+ }
+
+ // The actual deserialization may involve nested records, which require recursion.
+ private List workerBase(List objectRow, List columnNames, List columnTypes, GenericRecord record) throws AvroSerdeException {
+ for(int i = 0; i < columnNames.size(); i++) {
+ TypeInfo columnType = columnTypes.get(i);
+ String columnName = columnNames.get(i);
+ Object datum = record.get(columnName);
+ Schema datumSchema = record.getSchema().getField(columnName).schema();
+
+ objectRow.add(worker(datum, datumSchema, columnType));
+ }
+
+ return objectRow;
+ }
+
+ private Object worker(Object datum, Schema recordSchema, TypeInfo columnType) throws AvroSerdeException {
+ // Klaxon! Klaxon! Klaxon!
+ // Avro requires NULLable types to be defined as unions of some type T
+ // and NULL. This is annoying and we're going to hide it from the user.
+ if(AvroSerdeUtils.isNullableType(recordSchema))
+ return deserializeNullableUnion(datum, recordSchema, columnType);
+
+ if(columnType == TypeInfoFactory.stringTypeInfo)
+ return datum.toString(); // To workaround AvroUTF8
+ // This also gets us around the Enum issue since we just take the value and convert it to a string. Yay!
+
+ switch(columnType.getCategory()) {
+ case STRUCT:
+ return deserializeStruct((GenericData.Record) datum, (StructTypeInfo) columnType);
+ case UNION:
+ return deserializeUnion(datum, recordSchema, (UnionTypeInfo) columnType);
+ case LIST:
+ return deserializeList(datum, recordSchema, (ListTypeInfo) columnType);
+ case MAP:
+ return deserializeMap(datum, recordSchema, (MapTypeInfo) columnType);
+ default:
+ return datum; // Simple type.
+ }
+ }
+
+ /**
+ * Extract either a null or the correct type from a Nullable type. This is
+ * horrible in that we rebuild the TypeInfo every time.
+ */
+ private Object deserializeNullableUnion(Object datum, Schema recordSchema, TypeInfo columnType) throws AvroSerdeException {
+ int tag = GenericData.get().resolveUnion(recordSchema, datum); // Determine index of value
+ Schema schema = recordSchema.getTypes().get(tag);
+ if(schema.getType().equals(Schema.Type.NULL))
+ return null;
+ return worker(datum, schema, SchemaToTypeInfo.generateTypeInfo(schema));
+
+ }
+
+ private Object deserializeStruct(GenericData.Record datum, StructTypeInfo columnType) throws AvroSerdeException {
+ // No equivalent Java type for the backing structure, need to recurse and build a list
+ ArrayList innerFieldTypes = columnType.getAllStructFieldTypeInfos();
+ ArrayList innerFieldNames = columnType.getAllStructFieldNames();
+ List innerObjectRow = new ArrayList(innerFieldTypes.size());
+
+ return workerBase(innerObjectRow, innerFieldNames, innerFieldTypes, datum);
+ }
+
+ private Object deserializeUnion(Object datum, Schema recordSchema, UnionTypeInfo columnType) throws AvroSerdeException {
+ int tag = GenericData.get().resolveUnion(recordSchema, datum); // Determine index of value
+ Object desered = worker(datum, recordSchema.getTypes().get(tag), columnType.getAllUnionObjectTypeInfos().get(tag));
+ return new StandardUnionObjectInspector.StandardUnion((byte)tag, desered);
+ }
+
+ private Object deserializeList(Object datum, Schema recordSchema, ListTypeInfo columnType) throws AvroSerdeException {
+ // Need to check the original schema to see if this is actually a Fixed.
+ if(recordSchema.getType().equals(Schema.Type.FIXED)) {
+ // We're faking out Hive to work through a type system impedence mismatch. Pull out the backing array and convert to a list.
+ GenericData.Fixed fixed = (GenericData.Fixed) datum;
+ List asList = new ArrayList(fixed.bytes().length);
+ for(int j = 0; j < fixed.bytes().length; j++) {
+ asList.add(fixed.bytes()[j]);
+ }
+ return asList;
+ } else if(recordSchema.getType().equals(Schema.Type.BYTES)) {
+ // This is going to be slow... hold on.
+ ByteBuffer bb = (ByteBuffer)datum;
+ List asList = new ArrayList(bb.capacity());
+ byte[] array = bb.array();
+ for(int j = 0; j < array.length; j++) {
+ asList.add(array[j]);
+ }
+ return asList;
+ } else { // An actual list, deser its values
+ List listData = (List) datum;
+ Schema listSchema = recordSchema.getElementType();
+ List listContents = new ArrayList(listData.size());
+ for(Object obj : listData) {
+ listContents.add(worker(obj, listSchema, columnType.getListElementTypeInfo()));
+ }
+ return listContents;
+ }
+ }
+
+ private Object deserializeMap(Object datum, Schema mapSchema, MapTypeInfo columnType) throws AvroSerdeException {
+ // Avro only allows maps with Strings for keys, so we only have to worry
+ // about deserializing the values
+ Map map = new Hashtable();
+ Map mapDatum = (Map)datum;
+ Schema valueSchema = mapSchema.getValueType();
+ TypeInfo valueTypeInfo = columnType.getMapValueTypeInfo();
+ for (Utf8 key : mapDatum.keySet()) {
+ Object value = mapDatum.get(key);
+ map.put(key.toString(), worker(value, valueSchema, valueTypeInfo));
+ }
+
+ return map;
+ }
+}
diff --git serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroGenericRecordReader.java serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroGenericRecordReader.java
new file mode 100644
index 0000000..4b10ac5
--- /dev/null
+++ serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroGenericRecordReader.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.serde2.avro;
+
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.FsInput;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.*;
+
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * RecordReader optimized against Avro GenericRecords that returns to record
+ * as the value of the k-v pair, as Hive requires.
+ */
+public class AvroGenericRecordReader implements RecordReader, JobConfigurable {
+ private static final Log LOG = LogFactory.getLog(AvroGenericRecordReader.class);
+
+ final private org.apache.avro.file.FileReader reader;
+ final private long start;
+ final private long stop;
+ protected JobConf jobConf;
+
+ public AvroGenericRecordReader(JobConf job, FileSplit split, Reporter reporter) throws IOException {
+ this.jobConf = job;
+ Schema latest;
+
+ try {
+ latest = getSchema(job, split);
+ } catch (AvroSerdeException e) {
+ throw new IOException(e);
+ }
+
+ GenericDatumReader gdr = new GenericDatumReader();
+
+ if(latest != null) gdr.setExpected(latest);
+
+ this.reader = new DataFileReader(new FsInput(split.getPath(), job), gdr);
+ this.reader.sync(split.getStart());
+ this.start = reader.tell();
+ this.stop = split.getStart() + split.getLength();
+ }
+
+ /**
+ * Attempt to retrieve the reader schema. We have a couple opportunities
+ * to provide this, depending on whether or not we're just selecting data
+ * or running with a MR job.
+ * @return Reader schema for the Avro object, or null if it has not been provided.
+ * @throws AvroSerdeException
+ */
+ private Schema getSchema(JobConf job, FileSplit split) throws AvroSerdeException, IOException {
+ FileSystem fs = split.getPath().getFileSystem(job);
+ // Inside of a MR job, we can pull out the actual properties
+ if(AvroSerdeUtils.insideMRJob(job)) {
+ MapredWork mapRedWork = Utilities.getMapRedWork(job);
+
+ // Iterate over the Path -> Partition descriptions to find the partition
+ // that matches our input split.
+ for (Map.Entry pathsAndParts: mapRedWork.getPathToPartitionInfo().entrySet()){
+ String partitionPath = pathsAndParts.getKey();
+ if(pathIsInPartition(split.getPath().makeQualified(fs), partitionPath)) {
+ if(LOG.isInfoEnabled()) LOG.info("Matching partition " + partitionPath + " with input split " + split);
+
+ Properties props = pathsAndParts.getValue().getProperties();
+ if(props.containsKey(AvroSerdeUtils.SCHEMA_LITERAL) || props.containsKey(AvroSerdeUtils.SCHEMA_URL)) {
+ return AvroSerdeUtils.determineSchemaOrThrowException(props);
+ } else
+ return null; // If it's not in this property, it won't be in any others
+ }
+ }
+ if(LOG.isInfoEnabled()) LOG.info("Unable to match filesplit " + split + " with a partition.");
+ }
+
+ // In "select * from table" situations (non-MR), we can add things to the job
+ // It's safe to add this to the job since it's not *actually* a mapred job.
+ // Here the global state is confined to just this process.
+ String s = job.get(AvroSerdeUtils.AVRO_SERDE_SCHEMA);
+ if(s != null) {
+ LOG.info("Found the avro schema in the job: " + s);
+ return Schema.parse(s);
+ }
+ // No more places to get the schema from. Give up. May have to re-encode later.
+ return null;
+ }
+
+ private boolean pathIsInPartition(Path split, String partitionPath) {
+ return split.toString().startsWith(partitionPath);
+ }
+
+
+ @Override
+ public boolean next(NullWritable nullWritable, AvroGenericRecordWritable record) throws IOException {
+ if(!reader.hasNext() || reader.pastSync(stop)) return false;
+
+ GenericData.Record r = (GenericData.Record)reader.next();
+ record.setRecord(r);
+
+ return true;
+ }
+
+ @Override
+ public NullWritable createKey() {
+ return NullWritable.get();
+ }
+
+ @Override
+ public AvroGenericRecordWritable createValue() {
+ return new AvroGenericRecordWritable();
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return reader.tell();
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ return stop == start ? 0.0f
+ : Math.min(1.0f, (getPos() - start) / (float)(stop - start));
+ }
+
+ @Override
+ public void configure(JobConf jobConf) {
+ this.jobConf= jobConf;
+ }
+}
diff --git serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroGenericRecordWritable.java serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroGenericRecordWritable.java
new file mode 100644
index 0000000..aeb5345
--- /dev/null
+++ serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroGenericRecordWritable.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.serde2.avro;
+
+
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.hadoop.io.Writable;
+
+import java.io.*;
+
+/**
+ * Wrapper around an Avro GenericRecord. Necessary because Hive's deserializer
+ * will happily deserialize any object - as long as it's a writable.
+ */
+public class AvroGenericRecordWritable implements Writable{
+ GenericRecord record;
+ private BinaryDecoder binaryDecoder;
+
+ // There are two areas of exploration for optimization here.
+ // 1. We're serializing the schema with every object. If we assume the schema
+ // provided by the table is always correct, we don't need to do this and
+ // and can just send the serialized bytes.
+ // 2. We serialize/deserialize to/from bytes immediately. We may save some
+ // time but doing this lazily, but until there's evidence this is useful,
+ // it's not worth adding the extra state.
+ public GenericRecord getRecord() {
+ return record;
+ }
+
+ public void setRecord(GenericRecord record) {
+ this.record = record;
+ }
+
+ public AvroGenericRecordWritable() {}
+
+ public AvroGenericRecordWritable(GenericRecord record) {
+ this.record = record;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ // Write schema since we need it to pull the data out. (see point #1 above)
+ String schemaString = record.getSchema().toString(false);
+ out.writeUTF(schemaString);
+
+ // Write record to byte buffer
+ GenericDatumWriter gdw = new GenericDatumWriter();
+ BinaryEncoder be = EncoderFactory.get().directBinaryEncoder((DataOutputStream)out, null);
+
+ gdw.setSchema(record.getSchema());
+ gdw.write(record, be);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ Schema schema = Schema.parse(in.readUTF());
+ record = new GenericData.Record(schema);
+ binaryDecoder = DecoderFactory.defaultFactory().createBinaryDecoder((InputStream) in, binaryDecoder);
+ GenericDatumReader gdr = new GenericDatumReader(schema);
+ record = gdr.read(record, binaryDecoder);
+ }
+}
diff --git serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroGenericRecordWriter.java serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroGenericRecordWriter.java
new file mode 100644
index 0000000..6687892
--- /dev/null
+++ serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroGenericRecordWriter.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.serde2.avro;
+
+
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.io.Writable;
+
+import java.io.IOException;
+
+/**
+ * Write an Avro GenericRecord to an Avro data file.
+ */
+public class AvroGenericRecordWriter implements FileSinkOperator.RecordWriter{
+ final private DataFileWriter dfw;
+
+ public AvroGenericRecordWriter(DataFileWriter dfw) {
+ this.dfw = dfw;
+ }
+
+ @Override
+ public void write(Writable writable) throws IOException {
+ if(!(writable instanceof AvroGenericRecordWritable))
+ throw new IOException("Expecting instance of AvroGenericRecordWritable, but received" + writable.getClass().getCanonicalName());
+ AvroGenericRecordWritable r = (AvroGenericRecordWritable)writable;
+ dfw.append(r.getRecord());
+ }
+
+ @Override
+ public void close(boolean abort) throws IOException {
+ dfw.close();
+ }
+
+}
diff --git serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroObjectInspectorGenerator.java serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroObjectInspectorGenerator.java
new file mode 100644
index 0000000..194e9e8
--- /dev/null
+++ serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroObjectInspectorGenerator.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.serde2.avro;
+
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.*;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * An AvroObjectInspectorGenerator takes an Avro schema and creates the three
+ * data structures Hive needs to work with Avro-encoded data:
+ * * A list of the schema field names
+ * * A list of those fields equivalent types in Hive
+ * * An ObjectInspector capable of working with an instance of that datum.
+ */
+class AvroObjectInspectorGenerator {
+ final private List columnNames;
+ final private List columnTypes;
+ final private ObjectInspector oi;
+
+ public AvroObjectInspectorGenerator(Schema schema) throws SerDeException {
+ verifySchemaIsARecord(schema);
+
+ this.columnNames = generateColumnNames(schema);
+ this.columnTypes = SchemaToTypeInfo.generateColumnTypes(schema);
+ assert columnNames.size() == columnTypes.size();
+ this.oi = createObjectInspector();
+ }
+
+ private void verifySchemaIsARecord(Schema schema) throws SerDeException {
+ if(!schema.getType().equals(Schema.Type.RECORD))
+ throw new AvroSerdeException("Schema for table must be of type RECORD. " +
+ "Received type: " + schema.getType());
+ }
+
+ public List getColumnNames() {
+ return columnNames;
+ }
+
+ public List getColumnTypes() {
+ return columnTypes;
+ }
+
+ public ObjectInspector getObjectInspector() {
+ return oi;
+ }
+
+ private ObjectInspector createObjectInspector() throws SerDeException {
+ List columnOIs = new ArrayList(columnNames.size());
+
+ // At this point we've verified the types are correct.
+ for(int i = 0; i < columnNames.size(); i++) {
+ columnOIs.add(i, createObjectInspectorWorker(columnTypes.get(i)));
+ }
+ return ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, columnOIs);
+ }
+
+ private ObjectInspector createObjectInspectorWorker(TypeInfo ti) throws SerDeException {
+ // We don't need to do the check for U[T,Null] here because we'll give the real type
+ // at deserialization and the object inspector will never see the actual union.
+ if(!supportedCategories(ti))
+ throw new AvroSerdeException("Don't yet support this type: " + ti);
+ ObjectInspector result;
+ switch(ti.getCategory()) {
+ case PRIMITIVE:
+ PrimitiveTypeInfo pti = (PrimitiveTypeInfo)ti;
+ result = PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(pti.getPrimitiveCategory());
+ break;
+ case STRUCT:
+ StructTypeInfo sti = (StructTypeInfo)ti;
+ ArrayList ois = new ArrayList(sti.getAllStructFieldTypeInfos().size());
+ for(TypeInfo typeInfo : sti.getAllStructFieldTypeInfos()) {
+ ois.add(createObjectInspectorWorker(typeInfo));
+ }
+
+ result = ObjectInspectorFactory.getStandardStructObjectInspector(sti.getAllStructFieldNames(), ois);
+
+ break;
+ case MAP:
+ MapTypeInfo mti = (MapTypeInfo)ti;
+ result = ObjectInspectorFactory.getStandardMapObjectInspector(
+ PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(PrimitiveObjectInspector.PrimitiveCategory.STRING),
+ createObjectInspectorWorker(mti.getMapValueTypeInfo()));
+ break;
+ case LIST:
+ ListTypeInfo ati = (ListTypeInfo)ti;
+ result = ObjectInspectorFactory.getStandardListObjectInspector(createObjectInspectorWorker(ati.getListElementTypeInfo()));
+ break;
+ case UNION:
+ UnionTypeInfo uti = (UnionTypeInfo)ti;
+ List allUnionObjectTypeInfos = uti.getAllUnionObjectTypeInfos();
+ List unionObjectInspectors = new ArrayList(allUnionObjectTypeInfos.size());
+
+ for (TypeInfo typeInfo : allUnionObjectTypeInfos) {
+ unionObjectInspectors.add(createObjectInspectorWorker(typeInfo));
+ }
+
+ result = ObjectInspectorFactory.getStandardUnionObjectInspector(unionObjectInspectors);
+ break;
+ default:
+ throw new AvroSerdeException("No Hive categories matched: " + ti);
+ }
+
+ return result;
+ }
+
+ private boolean supportedCategories(TypeInfo ti) {
+ final ObjectInspector.Category c = ti.getCategory();
+ return c.equals(ObjectInspector.Category.PRIMITIVE) ||
+ c.equals(ObjectInspector.Category.MAP) ||
+ c.equals(ObjectInspector.Category.LIST) ||
+ c.equals(ObjectInspector.Category.STRUCT) ||
+ c.equals(ObjectInspector.Category.UNION);
+ }
+
+ private List generateColumnNames(Schema schema) {
+ List fields = schema.getFields();
+ List fieldsList = new ArrayList(fields.size());
+
+ for (Schema.Field field : fields) {
+ fieldsList.add(field.name());
+ }
+
+ return fieldsList;
+ }
+
+}
diff --git serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java
new file mode 100644
index 0000000..b0ee3dc
--- /dev/null
+++ serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.serde2.avro;
+
+
+import org.apache.avro.Schema;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeStats;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Read or write Avro data from Hive.
+ */
+public class AvroSerDe implements SerDe {
+ private static final Log LOG = LogFactory.getLog(AvroSerDe.class);
+ private ObjectInspector oi;
+ private List columnNames;
+ private List columnTypes;
+ private Schema schema;
+ private AvroDeserializer avroDeserializer = null;
+ private AvroSerializer avroSerializer = null;
+
+ private boolean badSchema = false;
+
+ @Override
+ public void initialize(Configuration configuration, Properties properties) throws SerDeException {
+ // Reset member variables so we don't get in a half-constructed state
+ if(schema != null)
+ LOG.info("Resetting already initialized AvroSerDe");
+
+ schema = null;
+ oi = null;
+ columnNames = null;
+ columnTypes = null;
+
+ properties = determineCorrectProperties(configuration, properties);
+
+ schema = AvroSerdeUtils.determineSchemaOrReturnErrorSchema(properties);
+ if(configuration == null) {
+ LOG.info("Configuration null, not inserting schema");
+ } else {
+ // force output files to have a .avro extension
+ configuration.set("hive.output.file.extension", ".avro");
+ configuration.set(AvroSerdeUtils.AVRO_SERDE_SCHEMA, schema.toString(false));
+ }
+
+ badSchema = schema.equals(SchemaResolutionProblem.SIGNAL_BAD_SCHEMA);
+
+ AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(schema);
+ this.columnNames = aoig.getColumnNames();
+ this.columnTypes = aoig.getColumnTypes();
+ this.oi = aoig.getObjectInspector();
+ }
+
+ // Hive passes different properties in at different times. If we're in a MR job,
+ // we'll get properties for the partition rather than the table, which will give
+ // us old values for the schema (if it's evolved). Therefore, in an MR job
+ // we need to extract the table properties.
+ // Also, in join queries, multiple properties will be included, so we need
+ // to extract out the one appropriate to the table we're serde'ing.
+ private Properties determineCorrectProperties(Configuration configuration, Properties properties) {
+ if((configuration instanceof JobConf) && AvroSerdeUtils.insideMRJob((JobConf) configuration)) {
+ LOG.info("In MR job, extracting table-level properties");
+ MapredWork mapRedWork = Utilities.getMapRedWork(configuration);
+ LinkedHashMap a = mapRedWork.getAliasToPartnInfo();
+ if(a.size() == 1) {
+ LOG.info("Only one PartitionDesc found. Returning that Properties");
+ PartitionDesc p = a.values().iterator().next();
+ TableDesc tableDesc = p.getTableDesc();
+ return tableDesc.getProperties();
+ } else {
+ String tableName = properties.getProperty("name");
+ LOG.info("Multiple PartitionDescs. Return properties for " + tableName);
+
+ for (Map.Entry partitionDescs : a.entrySet()) {
+ Properties p = partitionDescs.getValue().getTableDesc().getProperties();
+ if(p.get("name").equals(tableName)) {
+ // We've found the matching table partition
+ LOG.info("Matched table name against " + partitionDescs.getKey() + ", return its properties");
+ return p;
+ }
+ }
+ // Didn't find anything in partitions to match on. WARN, at least.
+ LOG.warn("Couldn't find any matching properties for table: " +
+ tableName + ". Returning original properties");
+ }
+
+ }
+ return properties;
+ }
+
+ @Override
+ public Class extends Writable> getSerializedClass() {
+ return AvroGenericRecordWritable.class;
+ }
+
+ @Override
+ public Writable serialize(Object o, ObjectInspector objectInspector) throws SerDeException {
+ if(badSchema) throw new BadSchemaException();
+ return getSerializer().serialize(o, objectInspector, columnNames, columnTypes, schema);
+ }
+
+ @Override
+ public Object deserialize(Writable writable) throws SerDeException {
+ if(badSchema) throw new BadSchemaException();
+ return getDeserializer().deserialize(columnNames, columnTypes, writable, schema);
+ }
+
+ @Override
+ public ObjectInspector getObjectInspector() throws SerDeException {
+ return oi;
+ }
+
+ @Override
+ public SerDeStats getSerDeStats() {
+ // No support for statistics. That seems to be a popular answer.
+ return null;
+ }
+
+ private AvroDeserializer getDeserializer() {
+ if(avroDeserializer == null) avroDeserializer = new AvroDeserializer();
+
+ return avroDeserializer;
+ }
+
+ private AvroSerializer getSerializer() {
+ if(avroSerializer == null) avroSerializer = new AvroSerializer();
+
+ return avroSerializer;
+ }
+}
diff --git serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeException.java serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeException.java
new file mode 100644
index 0000000..cd36ada
--- /dev/null
+++ serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.serde2.avro;
+
+
+import org.apache.hadoop.hive.serde2.SerDeException;
+
+public class AvroSerdeException extends SerDeException {
+ public AvroSerdeException() {
+ super();
+ }
+
+ public AvroSerdeException(String message) {
+ super(message);
+ }
+
+ public AvroSerdeException(Throwable cause) {
+ super(cause);
+ }
+
+ public AvroSerdeException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java
new file mode 100644
index 0000000..02d4a45
--- /dev/null
+++ serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.serde2.avro;
+
+
+import org.apache.avro.Schema;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.List;
+import java.util.Properties;
+
+class AvroSerdeUtils {
+ private static final Log LOG = LogFactory.getLog(AvroSerdeUtils.class);
+
+ public static final String SCHEMA_LITERAL = "schema.literal";
+ public static final String SCHEMA_URL = "schema.url";
+ public static final String SCHEMA_NONE = "none";
+ public static final String EXCEPTION_MESSAGE = "Neither " + SCHEMA_LITERAL + " nor "
+ + SCHEMA_URL + " specified, can't determine table schema";
+ public static final String AVRO_SERDE_SCHEMA = "avro.serde.schema";
+
+ /**
+ * Determine the schema to that's been provided for Avro serde work.
+ * @param properties containing a key pointing to the schema, one way or another
+ * @return schema to use while serdeing the avro file
+ * @throws IOException if error while trying to read the schema from another location
+ * @throws AvroSerdeException if unable to find a schema or pointer to it in the properties
+ */
+ public static Schema determineSchemaOrThrowException(Properties properties) throws IOException, AvroSerdeException {
+ String schemaString = properties.getProperty(SCHEMA_LITERAL);
+ if(schemaString != null && !schemaString.equals(SCHEMA_NONE))
+ return Schema.parse(schemaString);
+
+ // Try pulling directly from URL
+ schemaString = properties.getProperty(SCHEMA_URL);
+ if(schemaString == null || schemaString.equals(SCHEMA_NONE))
+ throw new AvroSerdeException(EXCEPTION_MESSAGE);
+
+ try {
+ if(schemaString.toLowerCase().startsWith("hdfs://"))
+ return getSchemaFromHDFS(schemaString, new Configuration());
+ } catch(IOException ioe) {
+ throw new AvroSerdeException("Unable to read schema from HDFS: " + schemaString, ioe);
+ }
+
+ return Schema.parse(new URL(schemaString).openStream());
+ }
+
+ /**
+ * Attempt to determine the schema via the usual means, but do not throw
+ * an exception if we fail. Instead, signal failure via a special
+ * schema. This is used because Hive calls init on the serde during
+ * any call, including calls to update the serde properties, meaning
+ * if the serde is in a bad state, there is no way to update that state.
+ */
+ public static Schema determineSchemaOrReturnErrorSchema(Properties props) {
+ try {
+ return determineSchemaOrThrowException(props);
+ } catch(AvroSerdeException he) {
+ LOG.warn("Encountered AvroSerdeException determining schema. Returning signal schema to indicate problem", he);
+ return SchemaResolutionProblem.SIGNAL_BAD_SCHEMA;
+ } catch (Exception e) {
+ LOG.warn("Encountered exception determining schema. Returning signal schema to indicate problem", e);
+ return SchemaResolutionProblem.SIGNAL_BAD_SCHEMA;
+ }
+ }
+ // Protected for testing and so we can pass in a conf for testing.
+ protected static Schema getSchemaFromHDFS(String schemaHDFSUrl, Configuration conf) throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ FSDataInputStream in = null;
+
+ try {
+ in = fs.open(new Path(schemaHDFSUrl));
+ Schema s = Schema.parse(in);
+ return s;
+ } finally {
+ if(in != null) in.close();
+ }
+ }
+
+ /**
+ * Determine if an Avro schema is of type Union[T, NULL]. Avro supports nullable
+ * types via a union of type T and null. This is a very common use case.
+ * As such, we want to silently convert it to just T and allow the value to be null.
+ *
+ * @return true if type represents Union[T, Null], false otherwise
+ */
+ public static boolean isNullableType(Schema schema) {
+ return schema.getType().equals(Schema.Type.UNION) &&
+ schema.getTypes().size() == 2 &&
+ (schema.getTypes().get(0).getType().equals(Schema.Type.NULL) || // [null, null] not allowed, so this check is ok.
+ schema.getTypes().get(1).getType().equals(Schema.Type.NULL));
+ }
+
+ /**
+ * In a nullable type, get the schema for the non-nullable type. This method
+ * does no checking that the provides Schema is nullable.
+ */
+ public static Schema getOtherTypeFromNullableType(Schema schema) {
+ List types = schema.getTypes();
+
+ return types.get(0).getType().equals(Schema.Type.NULL) ? types.get(1) : types.get(0);
+ }
+
+ /**
+ * Determine if we're being executed from within an MR job or as part
+ * of a select * statement. The signals for this varies between Hive versions.
+ * @param job that contains things that are or are not set in a job
+ * @return Are we in a job or not?
+ */
+ static boolean insideMRJob(JobConf job) {
+ return job != null
+ && (HiveConf.getVar(job, HiveConf.ConfVars.PLAN) != null)
+ && (!HiveConf.getVar(job, HiveConf.ConfVars.PLAN).isEmpty());
+ }
+}
diff --git serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerializer.java serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerializer.java
new file mode 100644
index 0000000..b8f5f31
--- /dev/null
+++ serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerializer.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.serde2.avro;
+
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.generic.GenericData;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.*;
+import org.apache.hadoop.hive.serde2.typeinfo.*;
+import org.apache.hadoop.io.Writable;
+import static org.apache.avro.Schema.Type.*;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+
+class AvroSerializer {
+ private static final Log LOG = LogFactory.getLog(AvroSerializer.class);
+
+ AvroGenericRecordWritable cache = new AvroGenericRecordWritable();
+
+ // Hive is pretty simple (read: stupid) in writing out values via the serializer.
+ // We're just going to go through, matching indices. Hive formats normally
+ // handle mismatches with null. We don't have that option, so instead we'll
+ // end up throwing an exception for invalid records.
+ public Writable serialize(Object o, ObjectInspector objectInspector, List columnNames, List columnTypes, Schema schema) throws AvroSerdeException {
+ StructObjectInspector soi = (StructObjectInspector) objectInspector;
+ GenericData.Record record = new GenericData.Record(schema);
+
+ List extends StructField> outputFieldRefs = soi.getAllStructFieldRefs();
+ if(outputFieldRefs.size() != columnNames.size())
+ throw new AvroSerdeException("Number of input columns was different than output columns (in = " + columnNames.size() + " vs out = " + outputFieldRefs.size());
+
+ int size = schema.getFields().size();
+ if(outputFieldRefs.size() != size) // Hive does this check for us, so we should be ok.
+ throw new AvroSerdeException("Hive passed in a different number of fields than the schema expected: (Hive wanted " + outputFieldRefs.size() +", Avro expected " + schema.getFields().size());
+
+ List extends StructField> allStructFieldRefs = soi.getAllStructFieldRefs();
+ List structFieldsDataAsList = soi.getStructFieldsDataAsList(o);
+
+ for(int i = 0; i < size; i++) {
+ Field field = schema.getFields().get(i);
+ TypeInfo typeInfo = columnTypes.get(i);
+ StructField structFieldRef = allStructFieldRefs.get(i);
+ Object structFieldData = structFieldsDataAsList.get(i);
+ ObjectInspector fieldOI = structFieldRef.getFieldObjectInspector();
+
+ Object val = serialize(typeInfo, fieldOI, structFieldData, field.schema());
+ record.put(field.name(), val);
+ }
+
+ if(!GenericData.get().validate(schema, record))
+ throw new SerializeToAvroException(schema, record);
+
+ cache.setRecord(record);
+
+ return cache;
+ }
+
+ private Object serialize(TypeInfo typeInfo, ObjectInspector fieldOI, Object structFieldData, Schema schema) throws AvroSerdeException {
+ switch(typeInfo.getCategory()) {
+ case PRIMITIVE:
+ assert fieldOI instanceof PrimitiveObjectInspector;
+ return serializePrimitive(typeInfo, (PrimitiveObjectInspector) fieldOI, structFieldData);
+ case MAP:
+ assert fieldOI instanceof MapObjectInspector;
+ assert typeInfo instanceof MapTypeInfo;
+ return serializeMap((MapTypeInfo) typeInfo, (MapObjectInspector) fieldOI, structFieldData, schema);
+ case LIST:
+ assert fieldOI instanceof ListObjectInspector;
+ assert typeInfo instanceof ListTypeInfo;
+ return serializeList((ListTypeInfo) typeInfo, (ListObjectInspector) fieldOI, structFieldData, schema);
+ case UNION:
+ assert fieldOI instanceof UnionObjectInspector;
+ assert typeInfo instanceof UnionTypeInfo;
+ return serializeUnion((UnionTypeInfo) typeInfo, (UnionObjectInspector) fieldOI, structFieldData, schema);
+ case STRUCT:
+ assert fieldOI instanceof StructObjectInspector;
+ assert typeInfo instanceof StructTypeInfo;
+ return serializeStruct((StructTypeInfo) typeInfo, (StructObjectInspector) fieldOI, structFieldData, schema);
+ default:
+ throw new AvroSerdeException("Ran out of TypeInfo Categories: " + typeInfo.getCategory());
+ }
+ }
+
+ private Object serializeStruct(StructTypeInfo typeInfo, StructObjectInspector ssoi, Object o, Schema schema) throws AvroSerdeException {
+ int size = schema.getFields().size();
+ List extends StructField> allStructFieldRefs = ssoi.getAllStructFieldRefs();
+ List structFieldsDataAsList = ssoi.getStructFieldsDataAsList(o);
+ GenericData.Record record = new GenericData.Record(schema);
+ ArrayList allStructFieldTypeInfos = typeInfo.getAllStructFieldTypeInfos();
+
+ for(int i = 0; i < size; i++) {
+ Field field = schema.getFields().get(i);
+ TypeInfo colTypeInfo = allStructFieldTypeInfos.get(i);
+ StructField structFieldRef = allStructFieldRefs.get(i);
+ Object structFieldData = structFieldsDataAsList.get(i);
+ ObjectInspector fieldOI = structFieldRef.getFieldObjectInspector();
+
+ Object val = serialize(colTypeInfo, fieldOI, structFieldData, field.schema());
+ record.put(field.name(), val);
+ }
+ return record;
+ }
+
+ private Object serializePrimitive(TypeInfo typeInfo, PrimitiveObjectInspector fieldOI, Object structFieldData) throws AvroSerdeException {
+ switch(fieldOI.getPrimitiveCategory()) {
+ case UNKNOWN:
+ throw new AvroSerdeException("Received UNKNOWN primitive category.");
+ case VOID:
+ return null;
+ default: // All other primitive types are simple
+ return fieldOI.getPrimitiveJavaObject(structFieldData);
+ }
+ }
+
+ private Object serializeUnion(UnionTypeInfo typeInfo, UnionObjectInspector fieldOI, Object structFieldData, Schema schema) throws AvroSerdeException {
+ byte tag = fieldOI.getTag(structFieldData);
+
+ // Invariant that Avro's tag ordering must match Hive's.
+ return serialize(typeInfo.getAllUnionObjectTypeInfos().get(tag),
+ fieldOI.getObjectInspectors().get(tag),
+ fieldOI.getField(structFieldData),
+ schema.getTypes().get(tag));
+ }
+
+ // We treat FIXED and BYTES as arrays of tinyints within Hive. Check
+ // if we're dealing with either of these types and thus need to serialize
+ // them as their Avro types.
+ private boolean isTransformedType(Schema schema) {
+ return schema.getType().equals(FIXED) || schema.getType().equals(BYTES);
+ }
+
+ private Object serializeTransformedType(ListTypeInfo typeInfo, ListObjectInspector fieldOI, Object structFieldData, Schema schema) throws AvroSerdeException {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Beginning to transform " + typeInfo + " with Avro schema " + schema.toString(false));
+ }
+ if(schema.getType().equals(FIXED)) return serializedAvroFixed(typeInfo, fieldOI, structFieldData, schema);
+ else return serializeAvroBytes(typeInfo, fieldOI, structFieldData, schema);
+
+ }
+
+ private Object serializeAvroBytes(ListTypeInfo typeInfo, ListObjectInspector fieldOI, Object structFieldData, Schema schema) throws AvroSerdeException {
+ ByteBuffer bb = ByteBuffer.wrap(extraByteArray(fieldOI, structFieldData));
+ return bb.rewind();
+ }
+
+ private Object serializedAvroFixed(ListTypeInfo typeInfo, ListObjectInspector fieldOI, Object structFieldData, Schema schema) throws AvroSerdeException {
+ return new GenericData.Fixed(schema, extraByteArray(fieldOI, structFieldData));
+ }
+
+ // For transforming to BYTES and FIXED, pull out the byte array Avro will want
+ private byte[] extraByteArray(ListObjectInspector fieldOI, Object structFieldData) throws AvroSerdeException {
+ // Grab a book. This is going to be slow.
+ int listLength = fieldOI.getListLength(structFieldData);
+ byte[] bytes = new byte[listLength];
+ assert fieldOI.getListElementObjectInspector() instanceof PrimitiveObjectInspector;
+ PrimitiveObjectInspector poi = (PrimitiveObjectInspector)fieldOI.getListElementObjectInspector();
+ List> list = fieldOI.getList(structFieldData);
+
+ for(int i = 0; i < listLength; i++) {
+ Object b = poi.getPrimitiveJavaObject(list.get(i));
+ if(!(b instanceof Byte))
+ throw new AvroSerdeException("Attempting to transform to bytes, element was not byte but " + b.getClass().getCanonicalName());
+ bytes[i] = (Byte)b;
+ }
+ return bytes;
+ }
+
+ private Object serializeList(ListTypeInfo typeInfo, ListObjectInspector fieldOI, Object structFieldData, Schema schema) throws AvroSerdeException {
+ if(isTransformedType(schema))
+ return serializeTransformedType(typeInfo, fieldOI, structFieldData, schema);
+
+ List> list = fieldOI.getList(structFieldData);
+ List deserialized = new ArrayList(list.size());
+
+ TypeInfo listElementTypeInfo = typeInfo.getListElementTypeInfo();
+ ObjectInspector listElementObjectInspector = fieldOI.getListElementObjectInspector();
+ Schema elementType = schema.getElementType();
+
+ for(int i = 0; i < list.size(); i++) {
+ deserialized.add(i, serialize(listElementTypeInfo, listElementObjectInspector, list.get(i), elementType));
+ }
+
+ return deserialized;
+ }
+
+ private Object serializeMap(MapTypeInfo typeInfo, MapObjectInspector fieldOI, Object structFieldData, Schema schema) throws AvroSerdeException {
+ // Avro only allows maps with string keys
+ if(!mapHasStringKey(fieldOI.getMapKeyObjectInspector()))
+ throw new AvroSerdeException("Avro only supports maps with keys as Strings. Current Map is: " + typeInfo.toString());
+
+ ObjectInspector mapKeyObjectInspector = fieldOI.getMapKeyObjectInspector();
+ ObjectInspector mapValueObjectInspector = fieldOI.getMapValueObjectInspector();
+ TypeInfo mapKeyTypeInfo = typeInfo.getMapKeyTypeInfo();
+ TypeInfo mapValueTypeInfo = typeInfo.getMapValueTypeInfo();
+ Map,?> map = fieldOI.getMap(structFieldData);
+ Schema valueType = schema.getValueType();
+
+ Map deserialized = new Hashtable(fieldOI.getMapSize(structFieldData));
+
+ for (Map.Entry, ?> entry : map.entrySet()) {
+ deserialized.put(serialize(mapKeyTypeInfo, mapKeyObjectInspector, entry.getKey(), null), // This works, but is a bit fragile. Construct a single String schema?
+ serialize(mapValueTypeInfo, mapValueObjectInspector, entry.getValue(), valueType));
+ }
+
+ return deserialized;
+ }
+
+ private boolean mapHasStringKey(ObjectInspector mapKeyObjectInspector) {
+ return mapKeyObjectInspector instanceof PrimitiveObjectInspector &&
+ ((PrimitiveObjectInspector) mapKeyObjectInspector)
+ .getPrimitiveCategory()
+ .equals(PrimitiveObjectInspector.PrimitiveCategory.STRING);
+ }
+
+ /**
+ * Thrown when, during serialization of a Hive row to an Avro record, Avro
+ * cannot verify the converted row to the record's schema.
+ */
+ public static class SerializeToAvroException extends AvroSerdeException {
+ final private Schema schema;
+ final private GenericData.Record record;
+
+ public SerializeToAvroException(Schema schema, GenericData.Record record) {
+ this.schema = schema;
+ this.record = record;
+ }
+
+ @Override
+ public String toString() {
+ return "Avro could not validate record against schema (record = " + record
+ + ") (schema = "+schema.toString(false) + ")";
+ }
+ }
+}
diff --git serde/src/java/org/apache/hadoop/hive/serde2/avro/BadSchemaException.java serde/src/java/org/apache/hadoop/hive/serde2/avro/BadSchemaException.java
new file mode 100644
index 0000000..88e13ed
--- /dev/null
+++ serde/src/java/org/apache/hadoop/hive/serde2/avro/BadSchemaException.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.serde2.avro;
+
+
+public class BadSchemaException extends AvroSerdeException {
+}
diff --git serde/src/java/org/apache/hadoop/hive/serde2/avro/InstanceCache.java serde/src/java/org/apache/hadoop/hive/serde2/avro/InstanceCache.java
new file mode 100644
index 0000000..952b936
--- /dev/null
+++ serde/src/java/org/apache/hadoop/hive/serde2/avro/InstanceCache.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.serde2.avro;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.HashMap;
+
+/**
+ * Cache for objects whose creation only depends on some other set of objects
+ * and therefore can be used against other equivalent versions of those
+ * objects. Essentially memoizes instance creation.
+ *
+ * @param Object that determines the instance
+ * @param Instance that will be created from SeedObject.
+ */
+public abstract class InstanceCache {
+ private static final Log LOG = LogFactory.getLog(InstanceCache.class);
+ HashMap cache = new HashMap();
+
+ public InstanceCache() {}
+
+ /**
+ * Retrieve (or create if it doesn't exist) the correct Instance for this
+ * SeedObject
+ */
+ public Instance retrieve(SeedObject hv) throws AvroSerdeException {
+ if(LOG.isDebugEnabled()) LOG.debug("Checking for hv: " + hv.toString());
+
+ if(cache.containsKey(hv.hashCode())) {
+ if(LOG.isDebugEnabled()) LOG.debug("Returning cache result.");
+ return cache.get(hv.hashCode());
+ }
+
+ if(LOG.isDebugEnabled()) LOG.debug("Creating new instance and storing in cache");
+
+ Instance instance = makeInstance(hv);
+ cache.put(hv.hashCode(), instance);
+ return instance;
+ }
+
+ protected abstract Instance makeInstance(SeedObject hv) throws AvroSerdeException;
+}
diff --git serde/src/java/org/apache/hadoop/hive/serde2/avro/ReaderWriterSchemaPair.java serde/src/java/org/apache/hadoop/hive/serde2/avro/ReaderWriterSchemaPair.java
new file mode 100644
index 0000000..b42b142
--- /dev/null
+++ serde/src/java/org/apache/hadoop/hive/serde2/avro/ReaderWriterSchemaPair.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.serde2.avro;
+
+
+import org.apache.avro.Schema;
+
+/**
+ * Simple pair class used for memoizing schema-reencoding operations.
+ */
+class ReaderWriterSchemaPair {
+ final Schema reader;
+ final Schema writer;
+
+ public ReaderWriterSchemaPair(Schema writer, Schema reader) {
+ this.reader = reader;
+ this.writer = writer;
+ }
+
+ public Schema getReader() {
+ return reader;
+ }
+
+ public Schema getWriter() {
+ return writer;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ ReaderWriterSchemaPair that = (ReaderWriterSchemaPair) o;
+
+ if (!reader.equals(that.reader)) return false;
+ if (!writer.equals(that.writer)) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = reader.hashCode();
+ result = 31 * result + writer.hashCode();
+ return result;
+ }
+}
diff --git serde/src/java/org/apache/hadoop/hive/serde2/avro/SchemaResolutionProblem.java serde/src/java/org/apache/hadoop/hive/serde2/avro/SchemaResolutionProblem.java
new file mode 100644
index 0000000..a87b4ed
--- /dev/null
+++ serde/src/java/org/apache/hadoop/hive/serde2/avro/SchemaResolutionProblem.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.serde2.avro;
+
+
+
+import org.apache.avro.Schema;
+
+class SchemaResolutionProblem {
+ static final String sentinelString = "{\n" +
+ " \"namespace\": \"org.apache.hadoop.hive\",\n" +
+ " \"name\": \"CannotDetermineSchemaSentinel\",\n" +
+ " \"type\": \"record\",\n" +
+ " \"fields\": [\n" +
+ " {\n" +
+ " \"name\":\"ERROR_ERROR_ERROR_ERROR_ERROR_ERROR_ERROR\",\n" +
+ " \"type\":\"string\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"Cannot_determine_schema\",\n" +
+ " \"type\":\"string\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"check\",\n" +
+ " \"type\":\"string\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"schema\",\n" +
+ " \"type\":\"string\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"url\",\n" +
+ " \"type\":\"string\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"and\",\n" +
+ " \"type\":\"string\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"literal\",\n" +
+ " \"type\":\"string\"\n" +
+ " }\n" +
+ " ]\n" +
+ "}";
+ public final static Schema SIGNAL_BAD_SCHEMA = Schema.parse(sentinelString);
+}
diff --git serde/src/java/org/apache/hadoop/hive/serde2/avro/SchemaToTypeInfo.java serde/src/java/org/apache/hadoop/hive/serde2/avro/SchemaToTypeInfo.java
new file mode 100644
index 0000000..764484c
--- /dev/null
+++ serde/src/java/org/apache/hadoop/hive/serde2/avro/SchemaToTypeInfo.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.serde2.avro;
+
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.avro.Schema.Type.*;
+
+/**
+ * Convert an Avro Schema to a Hive TypeInfo
+ */
+class SchemaToTypeInfo {
+ // Conversion of Avro primitive types to Hive primitive types
+ // Avro Hive
+ // Null
+ // boolean boolean check
+ // int int check
+ // long bigint check
+ // float double check
+ // double double check
+ // bytes
+ // string string check
+ // tinyint
+ // smallint
+
+ // Map of Avro's primitive types to Hives (for those that are supported by both)
+ private static final Map primitiveTypeToTypeInfo = initTypeMap();
+ private static Map initTypeMap() {
+ Map theMap = new Hashtable();
+ theMap.put(STRING, TypeInfoFactory.getPrimitiveTypeInfo("string"));
+ theMap.put(INT, TypeInfoFactory.getPrimitiveTypeInfo("int"));
+ theMap.put(BOOLEAN, TypeInfoFactory.getPrimitiveTypeInfo("boolean"));
+ theMap.put(LONG, TypeInfoFactory.getPrimitiveTypeInfo("bigint"));
+ theMap.put(FLOAT, TypeInfoFactory.getPrimitiveTypeInfo("float"));
+ theMap.put(DOUBLE, TypeInfoFactory.getPrimitiveTypeInfo("double"));
+ theMap.put(NULL, TypeInfoFactory.getPrimitiveTypeInfo("void"));
+ return Collections.unmodifiableMap(theMap);
+ }
+
+ /**
+ * Generate a list of of TypeInfos from an Avro schema. This method is
+ * currently public due to some weirdness in deserializing unions, but
+ * will be made private once that is resolved.
+ * @param schema Schema to generate field types for
+ * @return List of TypeInfos, each element of which is a TypeInfo derived
+ * from the schema.
+ * @throws AvroSerdeException for problems during conversion.
+ */
+ public static List generateColumnTypes(Schema schema) throws AvroSerdeException {
+ List fields = schema.getFields();
+
+ List types = new ArrayList(fields.size());
+
+ for (Schema.Field field : fields) {
+ types.add(generateTypeInfo(field.schema()));
+ }
+
+ return types;
+ }
+
+ static InstanceCache typeInfoCache = new InstanceCache() {
+ @Override
+ protected TypeInfo makeInstance(Schema s) throws AvroSerdeException {
+ return generateTypeInfoWorker(s);
+ }
+ };
+ /**
+ * Convert an Avro Schema into an equivalent Hive TypeInfo.
+ * @param schema to record. Must be of record type.
+ * @return TypeInfo matching the Avro schema
+ * @throws AvroSerdeException for any problems during conversion.
+ */
+ public static TypeInfo generateTypeInfo(Schema schema) throws AvroSerdeException {
+ return typeInfoCache.retrieve(schema);
+ }
+
+ private static TypeInfo generateTypeInfoWorker(Schema schema) throws AvroSerdeException {
+ // Avro requires NULLable types to be defined as unions of some type T
+ // and NULL. This is annoying and we're going to hide it from the user.
+ if(AvroSerdeUtils.isNullableType(schema))
+ return generateTypeInfo(AvroSerdeUtils.getOtherTypeFromNullableType(schema));
+
+ Schema.Type type = schema.getType();
+
+ if(primitiveTypeToTypeInfo.containsKey(type))
+ return primitiveTypeToTypeInfo.get(type);
+
+ switch(type) {
+ case BYTES: return generateBytesTypeInfo(schema);
+ case RECORD: return generateRecordTypeInfo(schema);
+ case MAP: return generateMapTypeInfo(schema);
+ case ARRAY: return generateArrayTypeInfo(schema);
+ case UNION: return generateUnionTypeInfo(schema);
+ case ENUM: return generateEnumTypeInfo(schema);
+ case FIXED: return generateFixedTypeInfo(schema);
+ default: throw new AvroSerdeException("Do not yet support: " + schema);
+ }
+ }
+
+ private static TypeInfo generateRecordTypeInfo(Schema schema) throws AvroSerdeException {
+ assert schema.getType().equals(Schema.Type.RECORD);
+
+ List fields = schema.getFields();
+ List fieldNames = new ArrayList(fields.size());
+ List typeInfos = new ArrayList(fields.size());
+
+ for(int i = 0; i < fields.size(); i++) {
+ fieldNames.add(i, fields.get(i).name());
+ typeInfos.add(i, generateTypeInfo(fields.get(i).schema()));
+ }
+
+ return TypeInfoFactory.getStructTypeInfo(fieldNames, typeInfos);
+ }
+
+ /**
+ * Generate a TypeInfo for an Avro Map. This is made slightly simpler in that
+ * Avro only allows maps with strings for keys.
+ */
+ private static TypeInfo generateMapTypeInfo(Schema schema) throws AvroSerdeException {
+ assert schema.getType().equals(Schema.Type.MAP);
+ Schema valueType = schema.getValueType();
+ TypeInfo ti = generateTypeInfo(valueType);
+
+ return TypeInfoFactory.getMapTypeInfo(TypeInfoFactory.getPrimitiveTypeInfo("string"), ti);
+ }
+
+ private static TypeInfo generateArrayTypeInfo(Schema schema) throws AvroSerdeException {
+ assert schema.getType().equals(Schema.Type.ARRAY);
+ Schema itemsType = schema.getElementType();
+ TypeInfo itemsTypeInfo = generateTypeInfo(itemsType);
+
+ return TypeInfoFactory.getListTypeInfo(itemsTypeInfo);
+ }
+
+ private static TypeInfo generateUnionTypeInfo(Schema schema) throws AvroSerdeException {
+ assert schema.getType().equals(Schema.Type.UNION);
+ List types = schema.getTypes();
+
+
+ List typeInfos = new ArrayList(types.size());
+
+ for(Schema type : types) {
+ typeInfos.add(generateTypeInfo(type));
+ }
+
+ return TypeInfoFactory.getUnionTypeInfo(typeInfos);
+ }
+
+ // Hive doesn't have an Enum type, so we're going to treat them as Strings.
+ // During the deserialize/serialize stage we'll check for enumness and
+ // convert as such.
+ private static TypeInfo generateEnumTypeInfo(Schema schema) {
+ assert schema.getType().equals(Schema.Type.ENUM);
+
+ return TypeInfoFactory.getPrimitiveTypeInfo("string");
+ }
+
+ // Hive doesn't have a Fixed type, so we're going to treat them as arrays of
+ // bytes
+ // TODO: Make note in documentation that Hive sends these out as signed bytes.
+ private static final TypeInfo FIXED_AND_BYTES_EQUIV = TypeInfoFactory.getListTypeInfo(TypeInfoFactory.byteTypeInfo);
+ private static TypeInfo generateFixedTypeInfo(Schema schema) {
+ assert schema.getType().equals(Schema.Type.FIXED);
+
+ return FIXED_AND_BYTES_EQUIV;
+ }
+
+ // Avro considers bytes to be a primitive type, but Hive doesn't. We'll
+ // convert them to a list of bytes, just like Fixed. Sigh.
+ private static TypeInfo generateBytesTypeInfo(Schema schema) {
+ assert schema.getType().equals(Schema.Type.BYTES);
+ return FIXED_AND_BYTES_EQUIV;
+ }
+}
diff --git serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java
new file mode 100644
index 0000000..89ee4fc
--- /dev/null
+++ serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.serde2.avro;
+
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaStringObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.VoidObjectInspector;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class TestAvroDeserializer {
+ private final GenericData GENERIC_DATA = GenericData.get();
+
+ @Test
+ public void canDeserializeVoidType() throws IOException, SerDeException {
+ String schemaString = "{\n" +
+ " \"type\": \"record\", \n" +
+ " \"name\": \"nullTest\",\n" +
+ " \"fields\" : [\n" +
+ " {\"name\": \"isANull\", \"type\": \"null\"}\n" +
+ " ]\n" +
+ "}";
+ Schema s = Schema.parse(schemaString);
+ GenericData.Record record = new GenericData.Record(s);
+
+ record.put("isANull", null);
+ assertTrue(GENERIC_DATA.validate(s, record));
+
+ AvroGenericRecordWritable garw = Utils.serializeAndDeserializeRecord(record);
+
+ AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(s);
+
+ AvroDeserializer de = new AvroDeserializer();
+
+ ArrayList row = (ArrayList)de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), garw, s);
+ assertEquals(1, row.size());
+ Object theVoidObject = row.get(0);
+ assertNull(theVoidObject);
+
+ StandardStructObjectInspector oi = (StandardStructObjectInspector)aoig.getObjectInspector();
+ StructField fieldRef = oi.getStructFieldRef("isANull");
+
+ Object shouldBeNull = oi.getStructFieldData(row, fieldRef);
+ assertNull(shouldBeNull);
+ assertTrue(fieldRef.getFieldObjectInspector() instanceof VoidObjectInspector);
+ }
+
+ @Test
+ public void canDeserializeMapsWithPrimitiveKeys() throws SerDeException, IOException {
+ Schema s = Schema.parse(TestAvroObjectInspectorGenerator.MAP_WITH_PRIMITIVE_VALUE_TYPE);
+ GenericData.Record record = new GenericData.Record(s);
+
+ Map m = new Hashtable();
+ m.put("one", 1l);
+ m.put("two", 2l);
+ m.put("three", 3l);
+
+ record.put("aMap", m);
+ assertTrue(GENERIC_DATA.validate(s, record));
+ System.out.println("record = " + record);
+
+ AvroGenericRecordWritable garw = Utils.serializeAndDeserializeRecord(record);
+
+ AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(s);
+
+ AvroDeserializer de = new AvroDeserializer();
+
+ ArrayList row = (ArrayList)de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), garw, s);
+ assertEquals(1, row.size());
+ Object theMapObject = row.get(0);
+ assertTrue(theMapObject instanceof Map);
+ Map theMap = (Map)theMapObject;
+
+ // Verify the raw object that's been created
+ assertEquals(1l, theMap.get("one"));
+ assertEquals(2l, theMap.get("two"));
+ assertEquals(3l, theMap.get("three"));
+
+ // Verify that the provided object inspector can pull out these same values
+ StandardStructObjectInspector oi = (StandardStructObjectInspector)aoig.getObjectInspector();
+
+ List z = oi.getStructFieldsDataAsList(row);
+ assertEquals(1, z.size());
+ StructField fieldRef = oi.getStructFieldRef("amap");
+
+ Map theMap2 = (Map)oi.getStructFieldData(row, fieldRef);
+ assertEquals(1l, theMap2.get("one"));
+ assertEquals(2l, theMap2.get("two"));
+ assertEquals(3l, theMap2.get("three"));
+ }
+
+ @Test
+ public void canDeserializeArrays() throws SerDeException, IOException {
+ Schema s = Schema.parse(TestAvroObjectInspectorGenerator.ARRAY_WITH_PRIMITIVE_ELEMENT_TYPE);
+ GenericData.Record record = new GenericData.Record(s);
+
+ List list = new ArrayList();
+ list.add("Eccleston");
+ list.add("Tennant");
+ list.add("Smith");
+
+ record.put("anArray", list);
+ assertTrue(GENERIC_DATA.validate(s, record));
+ System.out.println("Array-backed record = " + record);
+
+ AvroGenericRecordWritable garw = Utils.serializeAndDeserializeRecord(record);
+
+ AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(s);
+
+ AvroDeserializer de = new AvroDeserializer();
+ ArrayList row = (ArrayList)de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), garw, s);
+ assertEquals(1, row.size());
+ Object theArrayObject = row.get(0);
+ assertTrue(theArrayObject instanceof List);
+ List theList = (List)theArrayObject;
+
+ // Verify the raw object that's been created
+ assertEquals("Eccleston", theList.get(0));
+ assertEquals("Tennant", theList.get(1));
+ assertEquals("Smith", theList.get(2));
+
+ // Now go the correct way, through objectinspectors
+ StandardStructObjectInspector oi = (StandardStructObjectInspector)aoig.getObjectInspector();
+ StructField fieldRefToArray = oi.getStructFieldRef("anArray");
+
+ Object anArrayData = oi.getStructFieldData(row, fieldRefToArray);
+ StandardListObjectInspector anArrayOI = (StandardListObjectInspector)fieldRefToArray.getFieldObjectInspector();
+ assertEquals(3, anArrayOI.getListLength(anArrayData));
+
+ JavaStringObjectInspector elementOI = (JavaStringObjectInspector)anArrayOI.getListElementObjectInspector();
+
+ Object firstElement = anArrayOI.getListElement(anArrayData, 0);
+ assertEquals("Eccleston", elementOI.getPrimitiveJavaObject(firstElement));
+ assertTrue(firstElement instanceof String);
+
+ Object secondElement = anArrayOI.getListElement(anArrayData, 1);
+ assertEquals("Tennant", elementOI.getPrimitiveJavaObject(secondElement));
+ assertTrue(secondElement instanceof String);
+
+ Object thirdElement = anArrayOI.getListElement(anArrayData, 2);
+ assertEquals("Smith", elementOI.getPrimitiveJavaObject(thirdElement));
+ assertTrue(thirdElement instanceof String);
+
+ }
+
+ @Test
+ public void canDeserializeRecords() throws SerDeException, IOException {
+ Schema s = Schema.parse(TestAvroObjectInspectorGenerator.RECORD_SCHEMA);
+ GenericData.Record record = new GenericData.Record(s);
+ GenericData.Record innerRecord = new GenericData.Record(s.getField("aRecord").schema());
+ innerRecord.put("int1", 42);
+ innerRecord.put("boolean1", true);
+ innerRecord.put("long1", 42432234234l);
+ record.put("aRecord", innerRecord);
+ assertTrue(GENERIC_DATA.validate(s, record));
+
+ AvroGenericRecordWritable garw = Utils.serializeAndDeserializeRecord(record);
+
+ AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(s);
+
+ AvroDeserializer de = new AvroDeserializer();
+ ArrayList row = (ArrayList)de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), garw, s);
+ assertEquals(1, row.size());
+ Object theRecordObject = row.get(0);
+ System.out.println("theRecordObject = " + theRecordObject.getClass().getCanonicalName());
+
+ // The original record was lost in the deserialization, so just go the correct way, through objectinspectors
+ StandardStructObjectInspector oi = (StandardStructObjectInspector)aoig.getObjectInspector();
+ List extends StructField> allStructFieldRefs = oi.getAllStructFieldRefs();
+ assertEquals(1, allStructFieldRefs.size());
+ StructField fieldRefForaRecord = allStructFieldRefs.get(0);
+ assertEquals("arecord", fieldRefForaRecord.getFieldName());
+ Object innerRecord2 = oi.getStructFieldData(row, fieldRefForaRecord); // <--- use this!
+
+ // Extract innerRecord field refs
+ StandardStructObjectInspector innerRecord2OI = (StandardStructObjectInspector) fieldRefForaRecord.getFieldObjectInspector();
+
+ List extends StructField> allStructFieldRefs1 = innerRecord2OI.getAllStructFieldRefs();
+ assertEquals(3, allStructFieldRefs1.size());
+ assertEquals("int1", allStructFieldRefs1.get(0).getFieldName());
+ assertEquals("boolean1", allStructFieldRefs1.get(1).getFieldName());
+ assertEquals("long1", allStructFieldRefs1.get(2).getFieldName());
+
+ innerRecord2OI.getStructFieldsDataAsList(innerRecord2);
+ assertEquals(42, innerRecord2OI.getStructFieldData(innerRecord2, allStructFieldRefs1.get(0)));
+ assertEquals(true, innerRecord2OI.getStructFieldData(innerRecord2, allStructFieldRefs1.get(1)));
+ assertEquals(42432234234l, innerRecord2OI.getStructFieldData(innerRecord2, allStructFieldRefs1.get(2)));
+ }
+
+ private class ResultPair { // Because Pairs give Java the vapors.
+ public final ObjectInspector oi;
+ public final Object value;
+ public final Object unionObject;
+
+ private ResultPair(ObjectInspector oi, Object value, Object unionObject) {
+ this.oi = oi;
+ this.value = value;
+ this.unionObject = unionObject;
+ }
+ }
+
+ @Test
+ public void canDeserializeUnions() throws SerDeException, IOException {
+ Schema s = Schema.parse(TestAvroObjectInspectorGenerator.UNION_SCHEMA);
+ GenericData.Record record = new GenericData.Record(s);
+
+ record.put("aUnion", "this is a string");
+
+ ResultPair result = unionTester(s, record);
+ assertTrue(result.value instanceof String);
+ assertEquals("this is a string", result.value);
+ UnionObjectInspector uoi = (UnionObjectInspector)result.oi;
+ assertEquals(1, uoi.getTag(result.unionObject));
+
+ // Now the other enum possibility
+ record = new GenericData.Record(s);
+ record.put("aUnion", 99);
+ result = unionTester(s, record);
+ assertTrue(result.value instanceof Integer);
+ assertEquals(99, result.value);
+ uoi = (UnionObjectInspector)result.oi;
+ assertEquals(0, uoi.getTag(result.unionObject));
+ }
+
+ private ResultPair unionTester(Schema s, GenericData.Record record) throws SerDeException, IOException {
+ assertTrue(GENERIC_DATA.validate(s, record));
+ AvroGenericRecordWritable garw = Utils.serializeAndDeserializeRecord(record);
+
+ AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(s);
+
+ AvroDeserializer de = new AvroDeserializer();
+ ArrayList row = (ArrayList)de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), garw, s);
+ assertEquals(1, row.size());
+ StandardStructObjectInspector oi = (StandardStructObjectInspector)aoig.getObjectInspector();
+ List extends StructField> fieldRefs = oi.getAllStructFieldRefs();
+ assertEquals(1, fieldRefs.size());
+ StructField fieldRef = fieldRefs.get(0);
+ assertEquals("aunion", fieldRef.getFieldName());
+ Object theUnion = oi.getStructFieldData(row, fieldRef);
+
+ assertTrue(fieldRef.getFieldObjectInspector() instanceof UnionObjectInspector);
+ UnionObjectInspector fieldObjectInspector = (UnionObjectInspector)fieldRef.getFieldObjectInspector();
+ Object value = fieldObjectInspector.getField(theUnion);
+
+ return new ResultPair(fieldObjectInspector, value, theUnion);
+ }
+
+ @Test // Enums are one of two types we fudge for Hive. Enums go in, Strings come out.
+ public void canDeserializeEnums() throws SerDeException, IOException {
+ Schema s = Schema.parse(TestAvroObjectInspectorGenerator.ENUM_SCHEMA);
+ GenericData.Record record = new GenericData.Record(s);
+
+ record.put("baddies", "DALEKS");
+ assertTrue(GENERIC_DATA.validate(s, record));
+
+ AvroGenericRecordWritable garw = Utils.serializeAndDeserializeRecord(record);
+ AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(s);
+
+ AvroDeserializer de = new AvroDeserializer();
+ ArrayList row = (ArrayList)de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), garw, s);
+ assertEquals(1, row.size());
+ StandardStructObjectInspector oi = (StandardStructObjectInspector)aoig.getObjectInspector();
+ List extends StructField> fieldRefs = oi.getAllStructFieldRefs();
+ assertEquals(1, fieldRefs.size());
+ StructField fieldRef = fieldRefs.get(0);
+
+ assertEquals("baddies", fieldRef.getFieldName());
+
+ Object theStringObject = oi.getStructFieldData(row, fieldRef);
+ assertTrue(fieldRef.getFieldObjectInspector() instanceof StringObjectInspector);
+ StringObjectInspector soi = (StringObjectInspector)fieldRef.getFieldObjectInspector();
+
+ String finalValue = soi.getPrimitiveJavaObject(theStringObject);
+ assertEquals("DALEKS", finalValue);
+ }
+
+ @Test // Fixed doesn't exist in Hive. Fixeds go in, lists of bytes go out.
+ public void canDeserializeFixed() throws SerDeException, IOException {
+ Schema s = Schema.parse(TestAvroObjectInspectorGenerator.FIXED_SCHEMA);
+ GenericData.Record record = new GenericData.Record(s);
+
+ byte [] bytes = "ANANCIENTBLUEBOX".getBytes();
+ record.put("hash", new GenericData.Fixed(s, bytes));
+ assertTrue(GENERIC_DATA.validate(s, record));
+
+ AvroGenericRecordWritable garw = Utils.serializeAndDeserializeRecord(record);
+ AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(s);
+
+ AvroDeserializer de = new AvroDeserializer();
+ ArrayList row = (ArrayList)de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), garw, s);
+ assertEquals(1, row.size());
+ Object theArrayObject = row.get(0);
+ assertTrue(theArrayObject instanceof List);
+ List theList = (List)theArrayObject;
+ // Verify the raw object that's been created
+ for(int i = 0; i < bytes.length; i++) {
+ assertEquals(bytes[i], theList.get(i));
+ }
+
+ // Now go the correct way, through objectinspectors
+ StandardStructObjectInspector oi = (StandardStructObjectInspector)aoig.getObjectInspector();
+ List fieldsDataAsList = oi.getStructFieldsDataAsList(row);
+ assertEquals(1, fieldsDataAsList.size());
+ StructField fieldRef = oi.getStructFieldRef("hash");
+
+ List theList2 = (List)oi.getStructFieldData(row, fieldRef);
+ for(int i = 0; i < bytes.length; i++) {
+ assertEquals(bytes[i], theList2.get(i));
+ }
+ }
+
+ @Test
+ public void canDeserializeBytes() throws SerDeException, IOException {
+ Schema s = Schema.parse(TestAvroObjectInspectorGenerator.BYTES_SCHEMA);
+ GenericData.Record record = new GenericData.Record(s);
+
+ byte [] bytes = "ANANCIENTBLUEBOX".getBytes();
+
+ ByteBuffer bb = ByteBuffer.wrap(bytes);
+ bb.rewind();
+ record.put("bytesField", bb);
+ assertTrue(GENERIC_DATA.validate(s, record));
+
+ AvroGenericRecordWritable garw = Utils.serializeAndDeserializeRecord(record);
+ AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(s);
+
+ AvroDeserializer de = new AvroDeserializer();
+ ArrayList row = (ArrayList)de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), garw, s);
+ assertEquals(1, row.size());
+ Object theArrayObject = row.get(0);
+ assertTrue(theArrayObject instanceof List);
+
+ // Now go the correct way, through objectinspectors
+ StandardStructObjectInspector oi = (StandardStructObjectInspector)aoig.getObjectInspector();
+ List fieldsDataAsList = oi.getStructFieldsDataAsList(row);
+ assertEquals(1, fieldsDataAsList.size());
+ StructField fieldRef = oi.getStructFieldRef("bytesField");
+
+ List theList2 = (List)oi.getStructFieldData(row, fieldRef);
+ for(int i = 0; i < bytes.length; i++) {
+ assertEquals(bytes[i], theList2.get(i));
+ }
+ }
+
+ @Test
+ public void canDeserializeNullableTypes() throws IOException, SerDeException {
+ Schema s = Schema.parse(TestAvroObjectInspectorGenerator.NULLABLE_STRING_SCHEMA);
+ GenericData.Record record = new GenericData.Record(s);
+ record.put("nullableString", "this is a string");
+
+ verifyNullableType(record, s, "this is a string");
+
+ record = new GenericData.Record(s);
+ record.put("nullableString", null);
+ verifyNullableType(record, s, null);
+ }
+
+ private void verifyNullableType(GenericData.Record record, Schema s, String expected) throws SerDeException, IOException {
+ assertTrue(GENERIC_DATA.validate(s, record));
+
+ AvroGenericRecordWritable garw = Utils.serializeAndDeserializeRecord(record);
+ AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(s);
+
+ AvroDeserializer de = new AvroDeserializer();
+ ArrayList row = (ArrayList)de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), garw, s);
+ assertEquals(1, row.size());
+ Object rowElement = row.get(0);
+
+ StandardStructObjectInspector oi = (StandardStructObjectInspector)aoig.getObjectInspector();
+ List fieldsDataAsList = oi.getStructFieldsDataAsList(row);
+ assertEquals(1, fieldsDataAsList.size());
+ StructField fieldRef = oi.getStructFieldRef("nullablestring");
+ ObjectInspector fieldObjectInspector = fieldRef.getFieldObjectInspector();
+ StringObjectInspector soi = (StringObjectInspector)fieldObjectInspector;
+
+ if(expected == null)
+ assertNull(soi.getPrimitiveJavaObject(rowElement));
+ else
+ assertEquals("this is a string", soi.getPrimitiveJavaObject(rowElement));
+ }
+}
diff --git serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroObjectInspectorGenerator.java serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroObjectInspectorGenerator.java
new file mode 100644
index 0000000..ff3ec72
--- /dev/null
+++ serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroObjectInspectorGenerator.java
@@ -0,0 +1,493 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.serde2.avro;
+
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestAvroObjectInspectorGenerator {
+ private final TypeInfo STRING = TypeInfoFactory.getPrimitiveTypeInfo("string");
+ private final TypeInfo INT = TypeInfoFactory.getPrimitiveTypeInfo("int");
+ private final TypeInfo BOOLEAN = TypeInfoFactory.getPrimitiveTypeInfo("boolean");
+ private final TypeInfo LONG = TypeInfoFactory.getPrimitiveTypeInfo("bigint");
+ private final TypeInfo FLOAT = TypeInfoFactory.getPrimitiveTypeInfo("float");
+ private final TypeInfo DOUBLE = TypeInfoFactory.getPrimitiveTypeInfo("double");
+ private final TypeInfo VOID = TypeInfoFactory.getPrimitiveTypeInfo("void");
+
+ // These schemata are used in other tests
+ static public final String MAP_WITH_PRIMITIVE_VALUE_TYPE = "{\n" +
+ " \"namespace\": \"testing\",\n" +
+ " \"name\": \"oneMap\",\n" +
+ " \"type\": \"record\",\n" +
+ " \"fields\": [\n" +
+ " {\n" +
+ " \"name\":\"aMap\",\n" +
+ " \"type\":{\"type\":\"map\",\n" +
+ " \"values\":\"long\"}\n" +
+ "\t}\n" +
+ " ]\n" +
+ "}";
+ static public final String ARRAY_WITH_PRIMITIVE_ELEMENT_TYPE = "{\n" +
+ " \"namespace\": \"testing\",\n" +
+ " \"name\": \"oneArray\",\n" +
+ " \"type\": \"record\",\n" +
+ " \"fields\": [\n" +
+ " {\n" +
+ " \"name\":\"anArray\",\n" +
+ " \"type\":{\"type\":\"array\",\n" +
+ " \"items\":\"string\"}\n" +
+ "\t}\n" +
+ " ]\n" +
+ "}";
+ public static final String RECORD_SCHEMA = "{\n" +
+ " \"namespace\": \"testing.test.mctesty\",\n" +
+ " \"name\": \"oneRecord\",\n" +
+ " \"type\": \"record\",\n" +
+ " \"fields\": [\n" +
+ " {\n" +
+ " \"name\":\"aRecord\",\n" +
+ " \"type\":{\"type\":\"record\",\n" +
+ " \"name\":\"recordWithinARecord\",\n" +
+ " \"fields\": [\n" +
+ " {\n" +
+ " \"name\":\"int1\",\n" +
+ " \"type\":\"int\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"boolean1\",\n" +
+ " \"type\":\"boolean\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"long1\",\n" +
+ " \"type\":\"long\"\n" +
+ " }\n" +
+ " ]}\n" +
+ " }\n" +
+ " ]\n" +
+ "}";
+ public static final String UNION_SCHEMA = "{\n" +
+ " \"namespace\": \"test.a.rossa\",\n" +
+ " \"name\": \"oneUnion\",\n" +
+ " \"type\": \"record\",\n" +
+ " \"fields\": [\n" +
+ " {\n" +
+ " \"name\":\"aUnion\",\n" +
+ " \"type\":[\"int\", \"string\"]\n" +
+ " }\n" +
+ " ]\n" +
+ "}";
+ public static final String ENUM_SCHEMA = "{\n" +
+ " \"namespace\": \"clever.namespace.name.in.space\",\n" +
+ " \"name\": \"oneEnum\",\n" +
+ " \"type\": \"record\",\n" +
+ " \"fields\": [\n" +
+ " {\n" +
+ " \"name\":\"baddies\",\n" +
+ " \"type\":{\"type\":\"enum\",\"name\":\"villians\", \"symbols\": [\"DALEKS\", \"CYBERMEN\", \"SLITHEEN\", \"JAGRAFESS\"]}\n" +
+ " \n" +
+ " \n" +
+ " }\n" +
+ " ]\n" +
+ "}";
+ public static final String FIXED_SCHEMA = "{\n" +
+ " \"namespace\": \"ecapseman\",\n" +
+ " \"name\": \"oneFixed\",\n" +
+ " \"type\": \"record\",\n" +
+ " \"fields\": [\n" +
+ " {\n" +
+ " \"name\":\"hash\",\n" +
+ " \"type\":{\"type\": \"fixed\", \"name\": \"MD5\", \"size\": 16}\n" +
+ " }\n" +
+ " ]\n" +
+ "}";
+ public static final String NULLABLE_STRING_SCHEMA = "{\n" +
+ " \"type\": \"record\", \n" +
+ " \"name\": \"nullableUnionTest\",\n" +
+ " \"fields\" : [\n" +
+ " {\"name\":\"nullableString\", \"type\":[\"null\", \"string\"]}\n" +
+ " ]\n" +
+ "}";
+ public static final String BYTES_SCHEMA = "{\n" +
+ " \"type\": \"record\", \n" +
+ " \"name\": \"bytesTest\",\n" +
+ " \"fields\" : [\n" +
+ " {\"name\":\"bytesField\", \"type\":\"bytes\"}\n" +
+ " ]\n" +
+ "}";
+
+ public static final String KITCHEN_SINK_SCHEMA = "{\n" +
+ " \"namespace\": \"org.apache.hadoop.hive\",\n" +
+ " \"name\": \"kitchsink\",\n" +
+ " \"type\": \"record\",\n" +
+ " \"fields\": [\n" +
+ " {\n" +
+ " \"name\":\"string1\",\n" +
+ " \"type\":\"string\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"string2\",\n" +
+ " \"type\":\"string\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"int1\",\n" +
+ " \"type\":\"int\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"boolean1\",\n" +
+ " \"type\":\"boolean\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"long1\",\n" +
+ " \"type\":\"long\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"float1\",\n" +
+ " \"type\":\"float\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"double1\",\n" +
+ " \"type\":\"double\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"inner_record1\",\n" +
+ " \"type\":{ \"type\":\"record\",\n" +
+ " \"name\":\"inner_record1_impl\",\n" +
+ " \"fields\": [\n" +
+ " {\"name\":\"int_in_inner_record1\",\n" +
+ " \"type\":\"int\"},\n" +
+ " {\"name\":\"string_in_inner_record1\",\n" +
+ " \"type\":\"string\"}\n" +
+ " ]\n" +
+ " }\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"enum1\",\n" +
+ " \"type\":{\"type\":\"enum\", \"name\":\"enum1_values\", \"symbols\":[\"ENUM1_VALUES_VALUE1\",\"ENUM1_VALUES_VALUE2\", \"ENUM1_VALUES_VALUE3\"]}\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"array1\",\n" +
+ " \"type\":{\"type\":\"array\", \"items\":\"string\"}\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"map1\",\n" +
+ " \"type\":{\"type\":\"map\", \"values\":\"string\"}\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"union1\",\n" +
+ " \"type\":[\"float\", \"boolean\", \"string\"]\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"fixed1\",\n" +
+ " \"type\":{\"type\":\"fixed\", \"name\":\"fourbytes\", \"size\":4}\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"null1\",\n" +
+ " \"type\":\"null\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"UnionNullInt\",\n" +
+ " \"type\":[\"int\", \"null\"]\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"bytes1\",\n" +
+ " \"type\":\"bytes\"\n" +
+ " }\n" +
+ " ]\n" +
+ "}";
+
+ @Test // that we can only process records
+ public void failOnNonRecords() throws Exception {
+ String nonRecordSchema = "{ \"type\": \"enum\",\n" +
+ " \"name\": \"Suit\",\n" +
+ " \"symbols\" : [\"SPADES\", \"HEARTS\", \"DIAMONDS\", \"CLUBS\"]\n" +
+ "}";
+
+ Schema s = Schema.parse(nonRecordSchema);
+ try {
+ new AvroObjectInspectorGenerator(s);
+ fail("Should not be able to handle non-record Avro types");
+ } catch(SerDeException sde) {
+ assertTrue(sde.getMessage().startsWith("Schema for table must be of type RECORD"));
+ }
+ }
+
+ @Test
+ public void primitiveTypesWorkCorrectly() throws SerDeException {
+ final String bunchOfPrimitives = "{\n" +
+ " \"namespace\": \"testing\",\n" +
+ " \"name\": \"PrimitiveTypes\",\n" +
+ " \"type\": \"record\",\n" +
+ " \"fields\": [\n" +
+ " {\n" +
+ " \"name\":\"aString\",\n" +
+ " \"type\":\"string\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"anInt\",\n" +
+ " \"type\":\"int\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"aBoolean\",\n" +
+ " \"type\":\"boolean\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"aLong\",\n" +
+ " \"type\":\"long\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"aFloat\",\n" +
+ " \"type\":\"float\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"aDouble\",\n" +
+ " \"type\":\"double\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"aNull\",\n" +
+ " \"type\":\"null\"\n" +
+ " }\n" +
+ " ]\n" +
+ "}";
+ AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(Schema.parse(bunchOfPrimitives));
+
+ String [] expectedColumnNames = {"aString", "anInt", "aBoolean", "aLong", "aFloat", "aDouble", "aNull"};
+ verifyColumnNames(expectedColumnNames, aoig.getColumnNames());
+
+ TypeInfo [] expectedColumnTypes = {STRING, INT, BOOLEAN, LONG, FLOAT, DOUBLE, VOID};
+ verifyColumnTypes(expectedColumnTypes, aoig.getColumnTypes());
+
+ // Rip apart the object inspector, making sure we got what we expect.
+ final ObjectInspector oi = aoig.getObjectInspector();
+ assertTrue(oi instanceof StandardStructObjectInspector);
+ final StandardStructObjectInspector ssoi = (StandardStructObjectInspector)oi;
+ List extends StructField> structFields = ssoi.getAllStructFieldRefs();
+ assertEquals(expectedColumnNames.length, structFields.size());
+
+ for(int i = 0; i < expectedColumnNames.length;i++) {
+ assertEquals("Column names don't match", expectedColumnNames[i].toLowerCase(), structFields.get(i).getFieldName());
+ assertEquals("Column types don't match", expectedColumnTypes[i].getTypeName(), structFields.get(i).getFieldObjectInspector().getTypeName());
+ }
+ }
+
+ private void verifyColumnTypes(TypeInfo[] expectedColumnTypes, List columnTypes) {
+ for(int i = 0; i < expectedColumnTypes.length; i++) {
+ assertEquals(expectedColumnTypes[i], columnTypes.get(i));
+
+ }
+ }
+
+ private void verifyColumnNames(String[] expectedColumnNames, List columnNames) {
+ for(int i = 0; i < expectedColumnNames.length; i++) {
+ assertEquals(expectedColumnNames[i], columnNames.get(i));
+ }
+ }
+
+ @Test
+ public void canHandleMapsWithPrimitiveValueTypes() throws SerDeException {
+ Schema s = Schema.parse(MAP_WITH_PRIMITIVE_VALUE_TYPE);
+ AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(s);
+
+ // Column names
+ assertEquals(1, aoig.getColumnNames().size());
+ assertEquals("aMap", aoig.getColumnNames().get(0));
+
+ // Column types
+ assertEquals(1, aoig.getColumnTypes().size());
+ TypeInfo typeInfo = aoig.getColumnTypes().get(0);
+ assertEquals(ObjectInspector.Category.MAP, typeInfo.getCategory());
+ assertTrue(typeInfo instanceof MapTypeInfo);
+ MapTypeInfo mapTypeInfo = (MapTypeInfo)typeInfo;
+
+ assertEquals("bigint" /* == long in Avro */, mapTypeInfo.getMapValueTypeInfo().getTypeName());
+ assertEquals("string", mapTypeInfo.getMapKeyTypeInfo().getTypeName());
+ }
+
+ @Test
+ public void canHandleArrays() throws SerDeException {
+ Schema s = Schema.parse(ARRAY_WITH_PRIMITIVE_ELEMENT_TYPE);
+ AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(s);
+
+ // Column names
+ assertEquals(1, aoig.getColumnNames().size());
+ assertEquals("anArray", aoig.getColumnNames().get(0));
+
+ // Column types
+ assertEquals(1, aoig.getColumnTypes().size());
+ TypeInfo typeInfo = aoig.getColumnTypes().get(0);
+ assertEquals(ObjectInspector.Category.LIST, typeInfo.getCategory());
+ assertTrue(typeInfo instanceof ListTypeInfo);
+ ListTypeInfo listTypeInfo = (ListTypeInfo)typeInfo;
+
+ assertEquals("string", listTypeInfo.getListElementTypeInfo().getTypeName());
+ }
+
+ @Test
+ public void canHandleRecords() throws SerDeException {
+ Schema s = Schema.parse(RECORD_SCHEMA);
+ AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(s);
+
+ // Column names
+ assertEquals(1, aoig.getColumnNames().size());
+ assertEquals("aRecord", aoig.getColumnNames().get(0));
+
+ // Column types
+ assertEquals(1, aoig.getColumnTypes().size());
+ TypeInfo typeInfo = aoig.getColumnTypes().get(0);
+ assertEquals(ObjectInspector.Category.STRUCT, typeInfo.getCategory());
+ assertTrue(typeInfo instanceof StructTypeInfo);
+ StructTypeInfo structTypeInfo = (StructTypeInfo)typeInfo;
+
+ // Check individual elements of subrecord
+ ArrayList allStructFieldNames = structTypeInfo.getAllStructFieldNames();
+ ArrayList allStructFieldTypeInfos = structTypeInfo.getAllStructFieldTypeInfos();
+ assertEquals(allStructFieldNames.size(), 3);
+ String[] names = new String[]{"int1", "boolean1", "long1"};
+ String [] typeInfoStrings = new String [] {"int", "boolean", "bigint"};
+ for(int i = 0; i < allStructFieldNames.size(); i++) {
+ assertEquals("Fieldname " + allStructFieldNames.get(i) + " doesn't match expected " + names[i], names[i], allStructFieldNames.get(i));
+ assertEquals("Typeinfo " + allStructFieldTypeInfos.get(i) + " doesn't match expected " + typeInfoStrings[i], typeInfoStrings[i], allStructFieldTypeInfos.get(i).getTypeName());
+ }
+ }
+
+ @Test
+ public void canHandleUnions() throws SerDeException {
+ Schema s = Schema.parse(UNION_SCHEMA);
+ AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(s);
+
+ // Column names
+ assertEquals(1, aoig.getColumnNames().size());
+ assertEquals("aUnion", aoig.getColumnNames().get(0));
+
+ // Column types
+ assertEquals(1, aoig.getColumnTypes().size());
+ TypeInfo typeInfo = aoig.getColumnTypes().get(0);
+ assertTrue(typeInfo instanceof UnionTypeInfo);
+ UnionTypeInfo uti = (UnionTypeInfo)typeInfo;
+
+ // Check that the union has come out unscathed. No scathing of unions allowed.
+ List typeInfos = uti.getAllUnionObjectTypeInfos();
+ assertEquals(2, typeInfos.size());
+ assertEquals(INT, typeInfos.get(0));
+ assertEquals(STRING, typeInfos.get(1));
+ assertEquals("uniontype", uti.getTypeName());
+ }
+
+ @Test // Enums are one of two Avro types that Hive doesn't have any native support for.
+ public void canHandleEnums() throws SerDeException {
+ Schema s = Schema.parse(ENUM_SCHEMA);
+ AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(s);
+
+ // Column names - we lose the enumness of this schema
+ assertEquals(1, aoig.getColumnNames().size());
+ assertEquals("baddies", aoig.getColumnNames().get(0));
+
+ // Column types
+ assertEquals(1, aoig.getColumnTypes().size());
+ assertEquals(STRING, aoig.getColumnTypes().get(0));
+ }
+
+ @Test // Hive has no concept of Avro's fixed type. Fixed -> arrays of bytes
+ public void canHandleFixed() throws SerDeException {
+ Schema s = Schema.parse(FIXED_SCHEMA);
+
+ AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(s);
+
+ // Column names
+ assertEquals(1, aoig.getColumnNames().size());
+ assertEquals("hash", aoig.getColumnNames().get(0));
+
+ // Column types
+ assertEquals(1, aoig.getColumnTypes().size());
+ TypeInfo typeInfo = aoig.getColumnTypes().get(0);
+ assertTrue(typeInfo instanceof ListTypeInfo);
+ ListTypeInfo listTypeInfo = (ListTypeInfo) typeInfo;
+ assertTrue(listTypeInfo.getListElementTypeInfo() instanceof PrimitiveTypeInfo);
+ assertEquals("tinyint", listTypeInfo.getListElementTypeInfo().getTypeName());
+ }
+
+ @Test // Avro considers bytes primitive, Hive doesn't. Make them list of tinyint.
+ public void canHandleBytes() throws SerDeException {
+ Schema s = Schema.parse(BYTES_SCHEMA);
+
+ AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(s);
+
+ // Column names
+ assertEquals(1, aoig.getColumnNames().size());
+ assertEquals("bytesField", aoig.getColumnNames().get(0));
+
+ // Column types
+ assertEquals(1, aoig.getColumnTypes().size());
+ TypeInfo typeInfo = aoig.getColumnTypes().get(0);
+ assertTrue(typeInfo instanceof ListTypeInfo);
+ ListTypeInfo listTypeInfo = (ListTypeInfo) typeInfo;
+ assertTrue(listTypeInfo.getListElementTypeInfo() instanceof PrimitiveTypeInfo);
+ assertEquals("tinyint", listTypeInfo.getListElementTypeInfo().getTypeName());
+ }
+
+ @Test // That Union[T, NULL] is converted to just T.
+ public void convertsNullableTypes() throws SerDeException {
+ Schema s = Schema.parse(NULLABLE_STRING_SCHEMA);
+
+ AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(s);
+ assertEquals(1, aoig.getColumnNames().size());
+ assertEquals("nullableString", aoig.getColumnNames().get(0));
+
+ // Column types
+ assertEquals(1, aoig.getColumnTypes().size());
+ TypeInfo typeInfo = aoig.getColumnTypes().get(0);
+ assertTrue(typeInfo instanceof PrimitiveTypeInfo);
+ PrimitiveTypeInfo pti = (PrimitiveTypeInfo) typeInfo;
+ // Verify the union has been hidden and just the main type has been returned.
+ assertEquals(PrimitiveObjectInspector.PrimitiveCategory.STRING, pti.getPrimitiveCategory());
+ }
+
+ @Test
+ public void objectInspectorsAreCached() throws SerDeException {
+ // Verify that Hive is caching the object inspectors for us.
+ Schema s = Schema.parse(KITCHEN_SINK_SCHEMA);
+ AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(s);
+
+ Schema s2 = Schema.parse(KITCHEN_SINK_SCHEMA);
+ AvroObjectInspectorGenerator aoig2 = new AvroObjectInspectorGenerator(s2);
+
+
+ assertEquals(aoig.getObjectInspector(), aoig2.getObjectInspector());
+ // For once we actually want reference equality in Java.
+ assertTrue(aoig.getObjectInspector() == aoig2.getObjectInspector());
+ }
+}
diff --git serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroSerde.java serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroSerde.java
new file mode 100644
index 0000000..4d70cc6
--- /dev/null
+++ serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroSerde.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.serde2.avro;
+
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.io.Writable;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.hadoop.hive.contrib.serde2.avro.AvroSerdeUtils.AVRO_SERDE_SCHEMA;
+import static org.apache.hadoop.hive.contrib.serde2.avro.AvroSerdeUtils.SCHEMA_LITERAL;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestAvroSerde {
+ static final String originalSchemaString = "{\n" +
+ " \"namespace\": \"org.apache.hadoop.hive\",\n" +
+ " \"name\": \"previous\",\n" +
+ " \"type\": \"record\",\n" +
+ " \"fields\": [\n" +
+ " {\n" +
+ " \"name\":\"text\",\n" +
+ " \"type\":\"string\"\n" +
+ " }\n" +
+ " ]\n" +
+ "}";
+ static final String newSchemaString = "{\n" +
+ " \"namespace\": \"org.apache.hadoop.hive\",\n" +
+ " \"name\": \"new\",\n" +
+ " \"type\": \"record\",\n" +
+ " \"fields\": [\n" +
+ " {\n" +
+ " \"name\":\"text\",\n" +
+ " \"type\":\"string\"\n" +
+ " }\n" +
+ " ]\n" +
+ "}";
+
+ static final Schema originalSchema = Schema.parse(originalSchemaString);
+ static final Schema newSchema = Schema.parse(newSchemaString);
+
+ @Test
+ public void initializeDoesNotReuseSchemasFromConf() throws SerDeException {
+ // Hive will re-use the Configuration object that it passes in to be
+ // initialized. Therefore we need to make sure we don't look for any
+ // old schemas within it.
+ Configuration conf = new Configuration();
+ conf.set(AVRO_SERDE_SCHEMA, originalSchema.toString(false));
+
+ Properties props = new Properties();
+ props.put(SCHEMA_LITERAL, newSchemaString);
+
+
+ AvroSerDe asd = new AvroSerDe();
+ asd.initialize(conf, props);
+
+ // Verify that the schema now within the configuration is the one passed
+ // in via the properties
+ assertEquals(newSchema, Schema.parse(conf.get(AVRO_SERDE_SCHEMA)));
+ }
+
+ @Test
+ public void noSchemaProvidedReturnsErrorSchema() throws SerDeException {
+ Properties props = new Properties();
+
+ verifyErrorSchemaReturned(props);
+ }
+
+ @Test
+ public void gibberishSchemaProvidedReturnsErrorSchema() throws SerDeException {
+ Properties props = new Properties();
+ props.put(AvroSerdeUtils.SCHEMA_LITERAL, "blahblahblah");
+
+ verifyErrorSchemaReturned(props);
+ }
+
+ @Test
+ public void emptySchemaProvidedReturnsErrorSchema() throws SerDeException {
+ Properties props = new Properties();
+ props.put(AvroSerdeUtils.SCHEMA_LITERAL, "");
+
+ verifyErrorSchemaReturned(props);
+ }
+
+ @Test
+ public void badSchemaURLProvidedReturnsErrorSchema() throws SerDeException {
+ Properties props = new Properties();
+ props.put(AvroSerdeUtils.SCHEMA_URL, "not://a/url");
+
+ verifyErrorSchemaReturned(props);
+ }
+
+ @Test
+ public void emptySchemaURLProvidedReturnsErrorSchema() throws SerDeException {
+ Properties props = new Properties();
+ props.put(AvroSerdeUtils.SCHEMA_URL, "");
+
+ verifyErrorSchemaReturned(props);
+ }
+
+ @Test
+ public void bothPropertiesSetToNoneReturnsErrorSchema() throws SerDeException {
+ Properties props = new Properties();
+ props.put(AvroSerdeUtils.SCHEMA_URL, AvroSerdeUtils.SCHEMA_NONE);
+ props.put(AvroSerdeUtils.SCHEMA_LITERAL, AvroSerdeUtils.SCHEMA_NONE);
+
+ verifyErrorSchemaReturned(props);
+ }
+
+ private void verifyErrorSchemaReturned(Properties props) throws SerDeException {
+ AvroSerDe asd = new AvroSerDe();
+ asd.initialize(new Configuration(), props);
+ assertTrue(asd.getObjectInspector() instanceof StandardStructObjectInspector);
+ StandardStructObjectInspector oi = (StandardStructObjectInspector)asd.getObjectInspector();
+ List extends StructField> allStructFieldRefs = oi.getAllStructFieldRefs();
+ assertEquals(SchemaResolutionProblem.SIGNAL_BAD_SCHEMA.getFields().size(), allStructFieldRefs.size());
+ StructField firstField = allStructFieldRefs.get(0);
+ assertTrue(firstField.toString().contains("error_error_error_error_error_error_error"));
+
+ try {
+ Writable mock = Mockito.mock(Writable.class);
+ asd.deserialize(mock);
+ fail("Should have thrown a BadSchemaException");
+ } catch (BadSchemaException bse) {
+ // good
+ }
+
+ try {
+ Object o = Mockito.mock(Object.class);
+ ObjectInspector mockOI = Mockito.mock(ObjectInspector.class);
+ asd.serialize(o, mockOI);
+ fail("Should have thrown a BadSchemaException");
+ } catch (BadSchemaException bse) {
+ // good
+ }
+ }
+
+ @Test
+ public void getSerializedClassReturnsCorrectType() {
+ AvroSerDe asd = new AvroSerDe();
+ assertEquals(AvroGenericRecordWritable.class, asd.getSerializedClass());
+ }
+}
diff --git serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroSerdeUtils.java serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroSerdeUtils.java
new file mode 100644
index 0000000..4cfa7ef
--- /dev/null
+++ serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroSerdeUtils.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.serde2.avro;
+
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.apache.hadoop.hive.contrib.serde2.avro.AvroSerdeUtils.*;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+public class TestAvroSerdeUtils {
+ private final String NULLABLE_UNION = "{\n" +
+ " \"type\": \"record\", \n" +
+ " \"name\": \"nullTest\",\n" +
+ " \"fields\" : [\n" +
+ " {\"name\":\"mayBeNull\", \"type\":[\"string\", \"null\"]}\n" +
+ " ]\n" +
+ "}";
+ // Same union, order reveresed
+ private final String NULLABLE_UNION2 = "{\n" +
+ " \"type\": \"record\", \n" +
+ " \"name\": \"nullTest\",\n" +
+ " \"fields\" : [\n" +
+ " {\"name\":\"mayBeNull\", \"type\":[\"null\", \"string\"]}\n" +
+ " ]\n" +
+ "}";
+
+ private void testField(String schemaString, String fieldName, boolean shouldBeNullable) {
+ Schema s = Schema.parse(schemaString);
+ assertEquals(shouldBeNullable, isNullableType(s.getField(fieldName).schema()));
+ }
+
+ @Test
+ public void isNullableTypeAcceptsNullableUnions() {
+ testField(NULLABLE_UNION, "mayBeNull", true);
+ testField(NULLABLE_UNION2, "mayBeNull", true);
+ }
+
+ @Test
+ public void isNullableTypeIdentifiesUnionsOfMoreThanTwoTypes() {
+ String schemaString = "{\n" +
+ " \"type\": \"record\", \n" +
+ " \"name\": \"shouldNotPass\",\n" +
+ " \"fields\" : [\n" +
+ " {\"name\":\"mayBeNull\", \"type\":[\"string\", \"int\", \"null\"]}\n" +
+ " ]\n" +
+ "}";
+ testField(schemaString, "mayBeNull", false);
+ }
+
+ @Test
+ public void isNullableTypeIdentifiesUnionsWithoutNulls() {
+ String s = "{\n" +
+ " \"type\": \"record\", \n" +
+ " \"name\": \"unionButNoNull\",\n" +
+ " \"fields\" : [\n" +
+ " {\"name\":\"a\", \"type\":[\"int\", \"string\"]}\n" +
+ " ]\n" +
+ "}";
+ testField(s, "a", false);
+ }
+
+ @Test
+ public void isNullableTypeIdentifiesNonUnionTypes() {
+ String schemaString = "{\n" +
+ " \"type\": \"record\", \n" +
+ " \"name\": \"nullTest2\",\n" +
+ " \"fields\" : [\n" +
+ " {\"name\":\"justAnInt\", \"type\":\"int\"}\n" +
+ " ]\n" +
+ "}";
+ testField(schemaString, "justAnInt", false);
+ }
+
+ @Test
+ public void getTypeFromNullableTypePositiveCase() {
+ Schema s = Schema.parse(NULLABLE_UNION);
+ Schema typeFromNullableType = getOtherTypeFromNullableType(s.getField("mayBeNull").schema());
+ assertEquals(Schema.Type.STRING, typeFromNullableType.getType());
+
+ s = Schema.parse(NULLABLE_UNION2);
+ typeFromNullableType = getOtherTypeFromNullableType(s.getField("mayBeNull").schema());
+ assertEquals(Schema.Type.STRING, typeFromNullableType.getType());
+ }
+
+ @Test(expected=AvroSerdeException.class)
+ public void determineSchemaThrowsExceptionIfNoSchema() throws IOException, AvroSerdeException {
+ Properties prop = new Properties();
+ AvroSerdeUtils.determineSchemaOrThrowException(prop);
+ }
+
+ @Test
+ public void determineSchemaFindsLiterals() throws Exception {
+ String schema = TestAvroObjectInspectorGenerator.RECORD_SCHEMA;
+ Properties props = new Properties();
+ props.put(AvroSerdeUtils.SCHEMA_LITERAL, schema);
+ Schema expected = Schema.parse(schema);
+ assertEquals(expected, AvroSerdeUtils.determineSchemaOrThrowException(props));
+ }
+
+ @Test
+ public void detemineSchemaTriesToOpenUrl() throws AvroSerdeException, IOException {
+ Properties props = new Properties();
+ props.put(AvroSerdeUtils.SCHEMA_URL, "not:///a.real.url");
+
+ try {
+ AvroSerdeUtils.determineSchemaOrThrowException(props);
+ fail("Should have tried to open that URL");
+ } catch(MalformedURLException e) {
+ assertEquals("unknown protocol: not", e.getMessage());
+ }
+ }
+
+ @Test
+ public void noneOptionWorksForSpecifyingSchemas() throws IOException, AvroSerdeException {
+ Properties props = new Properties();
+
+ // Combo 1: Both set to none
+ props.put(SCHEMA_URL, SCHEMA_NONE);
+ props.put(SCHEMA_LITERAL, SCHEMA_NONE);
+ try {
+ determineSchemaOrThrowException(props);
+ fail("Should have thrown exception with none set for both url and literal");
+ } catch(AvroSerdeException he) {
+ assertEquals(EXCEPTION_MESSAGE, he.getMessage());
+ }
+
+ // Combo 2: Literal set, url set to none
+ props.put(SCHEMA_LITERAL, TestAvroObjectInspectorGenerator.RECORD_SCHEMA);
+ Schema s;
+ try {
+ s = determineSchemaOrThrowException(props);
+ assertNotNull(s);
+ assertEquals(Schema.parse(TestAvroObjectInspectorGenerator.RECORD_SCHEMA), s);
+ } catch(AvroSerdeException he) {
+ fail("Should have parsed schema literal, not thrown exception.");
+ }
+
+ // Combo 3: url set, literal set to none
+ props.put(SCHEMA_LITERAL, SCHEMA_NONE);
+ props.put(SCHEMA_URL, "not:///a.real.url");
+ try {
+ determineSchemaOrThrowException(props);
+ fail("Should have tried to open that bogus URL");
+ } catch(MalformedURLException e) {
+ assertEquals("unknown protocol: not", e.getMessage());
+ }
+ }
+
+ @Test
+ public void determineSchemaCanReadSchemaFromHDFS() throws IOException, AvroSerdeException {
+ // TODO: Make this an integration test, mock out hdfs for the actual unit test.
+ String schemaString = TestAvroObjectInspectorGenerator.RECORD_SCHEMA;
+ MiniDFSCluster miniDfs = null;
+ try {
+ // MiniDFSCluster litters files and folders all over the place.
+ System.setProperty("test.build.data", "./test-intermediate-stuff-data/");
+ miniDfs = new MiniDFSCluster(new Configuration(), 1, true, null);
+
+ miniDfs.getFileSystem().mkdirs(new Path("/path/to/schema"));
+ FSDataOutputStream out = miniDfs.getFileSystem().create(new Path("/path/to/schema/schema.avsc"));
+ out.writeBytes(schemaString);
+ out.close();
+ String onHDFS = miniDfs.getFileSystem().getUri() + "/path/to/schema/schema.avsc";
+
+ Schema schemaFromHDFS = AvroSerdeUtils.getSchemaFromHDFS(onHDFS, miniDfs.getFileSystem().getConf());
+ Schema expectedSchema = Schema.parse(schemaString);
+ assertEquals(expectedSchema, schemaFromHDFS);
+ } finally {
+ if(miniDfs != null) miniDfs.shutdown();
+ }
+ }
+}
diff --git serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroSerializer.java serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroSerializer.java
new file mode 100644
index 0000000..4cf3b45
--- /dev/null
+++ serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroSerializer.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.serde2.avro;
+
+
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.io.Writable;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import static org.junit.Assert.*;
+
+public class TestAvroSerializer {
+
+ private Schema buildSchema(String recordValues) {
+ String s = "{\n" +
+ " \"namespace\": \"org.apache.hadoop.hive\",\n" +
+ " \"name\": \"test_serializer\",\n" +
+ " \"type\": \"record\",\n" +
+ " \"fields\": [" +
+ recordValues +
+ " ] }";
+ return Schema.parse(s);
+ }
+
+ /**
+ * Verify that we can serialize an avro value by taking one, running it through
+ * the deser process and then serialize it again.
+ */
+ private GenericRecord serializeAndDeserialize(String recordValue, String fieldName, Object fieldValue) throws SerDeException, IOException {
+ Schema s = buildSchema(recordValue);
+ GenericData.Record r = new GenericData.Record(s);
+ r.put(fieldName, fieldValue);
+
+ AvroSerializer as = new AvroSerializer();
+
+ AvroDeserializer ad = new AvroDeserializer();
+ AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(s);
+ ObjectInspector oi = aoig.getObjectInspector();
+ List columnNames = aoig.getColumnNames();
+ List columnTypes = aoig.getColumnTypes();
+
+ AvroGenericRecordWritable agrw = Utils.serializeAndDeserializeRecord(r);
+ Object obj = ad.deserialize(columnNames, columnTypes, agrw, s);
+
+ Writable result = as.serialize(obj, oi, columnNames, columnTypes, s);
+ assertTrue(result instanceof AvroGenericRecordWritable);
+ GenericRecord r2 = ((AvroGenericRecordWritable) result).getRecord();
+ assertEquals(s, r2.getSchema());
+ return r2;
+ }
+
+ @Test
+ public void canSerializeStrings() throws SerDeException, IOException {
+ singleFieldTest("string1", "hello", "string");
+ }
+
+ private void singleFieldTest(String fieldName, Object fieldValue, String fieldType) throws SerDeException, IOException {
+ GenericRecord r2 = serializeAndDeserialize("{ \"name\":\"" + fieldName + "\", \"type\":\"" + fieldType + "\" }", fieldName, fieldValue);
+ assertEquals(fieldValue, r2.get(fieldName));
+ }
+
+ @Test
+ public void canSerializeInts() throws SerDeException, IOException {
+ singleFieldTest("int1", 42, "int");
+ }
+
+ @Test
+ public void canSerializeBooleans() throws SerDeException, IOException {
+ singleFieldTest("boolean1", true, "boolean");
+ }
+
+ @Test
+ public void canSerializeFloats() throws SerDeException, IOException {
+ singleFieldTest("float1", 42.24342f, "float");
+ }
+
+ @Test
+ public void canSerializeDoubles() throws SerDeException, IOException {
+ singleFieldTest("double1", 24.00000001, "double");
+ }
+
+ @Test
+ public void canSerializeLists() throws SerDeException, IOException {
+ List intList = new ArrayList();
+ Collections.addAll(intList, 1,2, 3);
+ String field = "{ \"name\":\"list1\", \"type\":{\"type\":\"array\", \"items\":\"int\"} }";
+ GenericRecord r = serializeAndDeserialize(field, "list1", intList);
+ assertEquals(intList, r.get("list1"));
+ }
+
+ @Test
+ public void canSerializeMaps() throws SerDeException, IOException {
+ Map m = new Hashtable();
+ m.put("yes", true);
+ m.put("no", false);
+ String field = "{ \"name\":\"map1\", \"type\":{\"type\":\"map\", \"values\":\"boolean\"} }";
+ GenericRecord r = serializeAndDeserialize(field, "map1", m);
+
+ assertEquals(m, r.get("map1"));
+ }
+
+ @Test
+ public void canSerializeStructs() throws SerDeException {
+ String field = "{ \"name\":\"struct1\", \"type\":{\"type\":\"record\", \"name\":\"struct1_name\", \"fields\": [\n" +
+ "{ \"name\":\"sInt\", \"type\":\"int\" }, { \"name\":\"sBoolean\", \"type\":\"boolean\" }, { \"name\":\"sString\", \"type\":\"string\" } ] } }";
+
+ Schema s = buildSchema(field);
+ GenericData.Record innerRecord = new GenericData.Record(s.getField("struct1").schema());
+
+ innerRecord.put("sInt", 77);
+ innerRecord.put("sBoolean", false);
+ innerRecord.put("sString", "tedious");
+
+ GenericData.Record r = new GenericData.Record(s);
+ r.put("struct1", innerRecord);
+
+ AvroSerializer as = new AvroSerializer();
+
+ AvroDeserializer ad = new AvroDeserializer();
+ AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(s);
+ ObjectInspector oi = aoig.getObjectInspector();
+ List columnNames = aoig.getColumnNames();
+ List columnTypes = aoig.getColumnTypes();
+ AvroGenericRecordWritable agrw = new AvroGenericRecordWritable(r);
+ Object obj = ad.deserialize(columnNames, columnTypes, agrw, s);
+
+ Writable result = as.serialize(obj, oi, columnNames, columnTypes, s);
+ assertTrue(result instanceof AvroGenericRecordWritable);
+ GenericRecord r2 = ((AvroGenericRecordWritable) result).getRecord();
+ assertEquals(s, r2.getSchema());
+
+ GenericRecord r3 = (GenericRecord)r2.get("struct1");
+ assertEquals(77, r3.get("sInt"));
+ assertEquals(false, r3.get("sBoolean"));
+ assertEquals("tedious", r3.get("sString"));
+ }
+
+ @Test
+ public void canSerializeUnions() throws SerDeException, IOException {
+ String field = "{ \"name\":\"union1\", \"type\":[\"float\", \"boolean\", \"string\"] }";
+ GenericRecord r = serializeAndDeserialize(field, "union1", 424.4f);
+ assertEquals(424.4f, r.get("union1"));
+
+ r = serializeAndDeserialize(field, "union1", true);
+ assertEquals(true, r.get("union1"));
+
+ r = serializeAndDeserialize(field, "union1", "hello");
+ assertEquals("hello", r.get("union1"));
+ }
+
+ private enum enum1 {BLUE, RED , GREEN};
+ @Test
+ public void canSerializeEnums() throws SerDeException, IOException {
+ for(enum1 e : enum1.values()) {
+ String field = "{ \"name\":\"enum1\", \"type\":{\"type\":\"enum\", \"name\":\"enum1_values\", \"symbols\":[\"BLUE\",\"RED\", \"GREEN\"]} }";
+ GenericRecord r = serializeAndDeserialize(field, "enum1", e);
+
+ assertEquals(e, enum1.valueOf(r.get("enum1").toString()));
+ }
+
+ }
+
+ @Test
+ public void canSerializeNullableTypes() throws SerDeException, IOException {
+ String field = "{ \"name\":\"nullableint\", \"type\":[\"int\", \"null\"] }";
+ GenericRecord r = serializeAndDeserialize(field, "nullableint", 42);
+ assertEquals(42, r.get("nullableint"));
+
+ r = serializeAndDeserialize(field, "nullableint", null);
+ assertNull(r.get("nullableint"));
+ }
+
+ @Test
+ public void canSerializeBytes() throws SerDeException, IOException {
+ String field = "{ \"name\":\"bytes1\", \"type\":\"bytes\" }";
+ ByteBuffer bb = ByteBuffer.wrap("easy as one two three".getBytes());
+ bb.rewind();
+ GenericRecord r = serializeAndDeserialize(field, "bytes1", bb);
+
+ assertEquals(bb, r.get("bytes1"));
+ }
+
+ @Test
+ public void canSerializeFixed() throws SerDeException, IOException {
+ String field = "{ \"name\":\"fixed1\", \"type\":{\"type\":\"fixed\", \"name\":\"threebytes\", \"size\":3} }";
+ GenericData.Fixed fixed = new GenericData.Fixed(buildSchema(field), "k9@".getBytes());
+ GenericRecord r = serializeAndDeserialize(field, "fixed1", fixed);
+
+ assertArrayEquals(fixed.bytes(), ((GenericData.Fixed) r.get("fixed1")).bytes());
+ }
+
+}
diff --git serde/src/test/org/apache/hadoop/hive/serde2/avro/TestGenericAvroRecordWritable.java serde/src/test/org/apache/hadoop/hive/serde2/avro/TestGenericAvroRecordWritable.java
new file mode 100644
index 0000000..38a4de1
--- /dev/null
+++ serde/src/test/org/apache/hadoop/hive/serde2/avro/TestGenericAvroRecordWritable.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.serde2.avro;
+
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.junit.Test;
+
+import java.io.*;
+
+import static org.junit.Assert.*;
+import static org.junit.Assert.assertFalse;
+
+public class TestGenericAvroRecordWritable {
+ private static final String schemaJSON = "{\n" +
+ " \"namespace\": \"gallifrey\",\n" +
+ " \"name\": \"TestPerson\",\n" +
+ " \"type\": \"record\",\n" +
+ " \"fields\": [\n" +
+ " {\n" +
+ " \"name\":\"first\",\n" +
+ " \"type\":\"string\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"last\",\n" +
+ " \"type\":\"string\"\n" +
+ " }\n" +
+ " ]\n" +
+ "}";
+
+ @Test
+ public void writableContractIsImplementedCorrectly() throws IOException {
+ Schema schema = Schema.parse(schemaJSON);
+
+ GenericRecord gr = new GenericData.Record(schema);
+ gr.put("first", "The");
+ gr.put("last", "Doctor");
+
+ assertEquals("The", gr.get("first"));
+ assertEquals("Doctor", gr.get("last"));
+
+ AvroGenericRecordWritable garw = new AvroGenericRecordWritable(gr);
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream daos = new DataOutputStream(baos);
+ garw.write(daos);
+
+ AvroGenericRecordWritable garw2 = new AvroGenericRecordWritable(gr);
+
+ ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+ DataInputStream dais = new DataInputStream(bais);
+
+ garw2.readFields(dais);
+
+ GenericRecord gr2 = garw2.getRecord();
+
+ assertEquals("The", gr2.get("first").toString());
+ assertEquals("Doctor", gr2.get("last").toString());
+ }
+}
diff --git serde/src/test/org/apache/hadoop/hive/serde2/avro/TestInstanceCache.java serde/src/test/org/apache/hadoop/hive/serde2/avro/TestInstanceCache.java
new file mode 100644
index 0000000..b327cd9
--- /dev/null
+++ serde/src/test/org/apache/hadoop/hive/serde2/avro/TestInstanceCache.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.serde2.avro;
+
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertSame;
+
+public class TestInstanceCache {
+ private static class Foo {
+ @Override
+ public int hashCode() {
+ return 42;
+ }
+ }
+
+ private static class Wrapper {
+ public final T wrapped;
+
+ private Wrapper(T wrapped) {
+ this.wrapped = wrapped;
+ }
+ }
+
+ @Test
+ public void instanceCachesOnlyCreateOneInstance() throws AvroSerdeException {
+ InstanceCache> ic = new InstanceCache>() {
+ @Override
+ protected Wrapper makeInstance(Foo hv) {
+ return new Wrapper(hv);
+ }
+ };
+ Foo f1 = new Foo();
+
+ Wrapper fc = ic.retrieve(f1);
+ assertSame(f1, fc.wrapped); // Our original foo should be in the wrapper
+
+ Foo f2 = new Foo(); // Different instance, same value
+
+ Wrapper fc2 = ic.retrieve(f2);
+ assertSame(fc2,fc); // Since equiv f, should get back first container
+ assertSame(fc2.wrapped, f1);
+ }
+
+ @Test
+ public void instanceCacheReturnsCorrectInstances() throws AvroSerdeException {
+ InstanceCache> ic = new InstanceCache>() {
+ @Override
+ protected Wrapper makeInstance(String hv) {
+ return new Wrapper(hv);
+ }
+ };
+
+ Wrapper one = ic.retrieve("one");
+ Wrapper two = ic.retrieve("two");
+
+ Wrapper anotherOne = ic.retrieve("one");
+ assertSame(one, anotherOne);
+
+ Wrapper anotherTwo = ic.retrieve("two");
+ assertSame(two, anotherTwo);
+ }
+}
diff --git serde/src/test/org/apache/hadoop/hive/serde2/avro/TestSchemaReEncoder.java serde/src/test/org/apache/hadoop/hive/serde2/avro/TestSchemaReEncoder.java
new file mode 100644
index 0000000..3f41938
--- /dev/null
+++ serde/src/test/org/apache/hadoop/hive/serde2/avro/TestSchemaReEncoder.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.serde2.avro;
+
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestSchemaReEncoder {
+ @Test
+ public void schemasCanAddFields() throws SerDeException {
+ String original = "{\n" +
+ " \"namespace\": \"org.apache.hadoop.hive\",\n" +
+ " \"name\": \"Line\",\n" +
+ " \"type\": \"record\",\n" +
+ " \"fields\": [\n" +
+ " {\n" +
+ " \"name\":\"text\",\n" +
+ " \"type\":\"string\"\n" +
+ " }\n" +
+ " ]\n" +
+ "}";
+ String evolved = "{\n" +
+ " \"namespace\": \"org.apache.hadoop.hive\",\n" +
+ " \"name\": \"Line\",\n" +
+ " \"type\": \"record\",\n" +
+ " \"fields\": [\n" +
+ " {\n" +
+ " \"name\":\"text\",\n" +
+ " \"type\":\"string\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"new_kid\",\n" +
+ " \"type\":\"string\",\n" +
+ " \"default\":\"Hi!\"\n" +
+ " }\n" +
+ " ]\n" +
+ "}";
+ Schema originalSchema = Schema.parse(original);
+ Schema evolvedSchema = Schema.parse(evolved);
+
+ GenericRecord record = new GenericData.Record(originalSchema);
+ record.put("text", "it is a far better thing I do, yadda, yadda");
+ assertTrue(GenericData.get().validate(originalSchema, record));
+ AvroDeserializer.SchemaReEncoder schemaReEncoder = new AvroDeserializer.SchemaReEncoder();
+ GenericRecord r2 = schemaReEncoder.reencode(record, evolvedSchema);
+
+ assertTrue(GenericData.get().validate(evolvedSchema, r2));
+ assertEquals("Hi!", r2.get("new_kid").toString());
+
+ // Now make sure that we can re-use the re-encoder against a completely
+ // different record to save resources
+ String original2 = "{\n" +
+ " \"namespace\": \"somebody.else\",\n" +
+ " \"name\": \"something_else\",\n" +
+ " \"type\": \"record\",\n" +
+ " \"fields\": [\n" +
+ " {\n" +
+ " \"name\":\"a\",\n" +
+ " \"type\":\"int\"\n" +
+ " }\n" +
+ " ]\n" +
+ "}";
+ String evolved2 = "{\n" +
+ " \"namespace\": \"somebody.else\",\n" +
+ " \"name\": \"something_else\",\n" +
+ " \"type\": \"record\",\n" +
+ " \"fields\": [\n" +
+ " {\n" +
+ " \"name\":\"a\",\n" +
+ " \"type\":\"int\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"b\",\n" +
+ " \"type\":\"long\",\n" +
+ " \"default\":42\n" +
+ " }\n" +
+ " ]\n" +
+ "}";
+ Schema originalSchema2 = Schema.parse(original2);
+ Schema evolvedSchema2 = Schema.parse(evolved2);
+
+ record = new GenericData.Record(originalSchema2);
+ record.put("a", 19);
+ assertTrue(GenericData.get().validate(originalSchema2, record));
+
+ r2 = schemaReEncoder.reencode(record, evolvedSchema2);
+ assertTrue(GenericData.get().validate(evolvedSchema2, r2));
+ assertEquals(42l, r2.get("b"));
+ }
+}
diff --git serde/src/test/org/apache/hadoop/hive/serde2/avro/TestThatEvolvedSchemasActAsWeWant.java serde/src/test/org/apache/hadoop/hive/serde2/avro/TestThatEvolvedSchemasActAsWeWant.java
new file mode 100644
index 0000000..69aab30
--- /dev/null
+++ serde/src/test/org/apache/hadoop/hive/serde2/avro/TestThatEvolvedSchemasActAsWeWant.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.serde2.avro;
+
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestThatEvolvedSchemasActAsWeWant {
+ @Test
+ public void resolvedSchemasShouldReturnReaderSchema() throws IOException {
+ // Need to verify that when reading a datum with an updated reader schema
+ // that the datum then returns the reader schema as its own, since we
+ // depend on this behavior in order to avoid re-encoding the datum
+ // in the serde.
+ String v0 = "{\n" +
+ " \"namespace\": \"org.apache.hadoop.hive\",\n" +
+ " \"name\": \"SomeStuff\",\n" +
+ " \"type\": \"record\",\n" +
+ " \"fields\": [\n" +
+ " {\n" +
+ " \"name\":\"v0\",\n" +
+ " \"type\":\"string\"\n" +
+ " }\n" +
+ " ]\n" +
+ "}";
+ String v1 = "{\n" +
+ " \"namespace\": \"org.apache.hadoop.hive\",\n" +
+ " \"name\": \"SomeStuff\",\n" +
+ " \"type\": \"record\",\n" +
+ " \"fields\": [\n" +
+ " {\n" +
+ " \"name\":\"v0\",\n" +
+ " \"type\":\"string\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"v1\",\n" +
+ " \"type\":\"string\",\n" +
+ " \"default\":\"v1_default\"" +
+ " }\n" +
+ " ]\n" +
+ "}";
+
+ Schema[] schemas = {Schema.parse(v0), Schema.parse(v1)};
+
+ // Encode a schema with v0, write out.
+ GenericRecord record = new GenericData.Record(schemas[0]);
+ record.put("v0", "v0 value");
+ assertTrue(GenericData.get().validate(schemas[0], record));
+
+ // Write datum out to a stream
+ GenericDatumWriter gdw = new GenericDatumWriter(schemas[0]);
+ DataFileWriter dfw = new DataFileWriter(gdw);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ dfw.create(schemas[0], baos);
+ dfw.append(record);
+ dfw.close();
+
+ ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+ GenericDatumReader gdr = new GenericDatumReader();
+ gdr.setExpected(schemas[1]);
+ DataFileStream dfs = new DataFileStream(bais, gdr);
+ assertTrue(dfs.hasNext());
+ GenericRecord next = dfs.next();
+ assertEquals("v0 value", next.get("v0").toString());
+ assertEquals("v1_default", next.get("v1").toString());
+
+ // Now the most important check - when we query this record for its schema,
+ // we should get back the latest, reader schema:
+ assertEquals(schemas[1], next.getSchema());
+ }
+}
diff --git serde/src/test/org/apache/hadoop/hive/serde2/avro/Utils.java serde/src/test/org/apache/hadoop/hive/serde2/avro/Utils.java
new file mode 100644
index 0000000..9770a04
--- /dev/null
+++ serde/src/test/org/apache/hadoop/hive/serde2/avro/Utils.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.serde2.avro;
+
+
+import org.apache.avro.generic.GenericData;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+class Utils {
+ // Force Avro to serialize and de-serialize the record to make sure it has a
+ // chance to muck with the bytes and we're working against real Avro data.
+ public static AvroGenericRecordWritable
+ serializeAndDeserializeRecord(GenericData.Record record) throws IOException {
+ AvroGenericRecordWritable garw = new AvroGenericRecordWritable(record);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream daos = new DataOutputStream(baos);
+ garw.write(daos);
+
+ ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+ DataInputStream dais = new DataInputStream(bais);
+
+ AvroGenericRecordWritable garw2 = new AvroGenericRecordWritable();
+ garw2.readFields(dais);
+ return garw2;
+ }
+}