diff --git serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java index cade8c3..437c3e6 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java +++ serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java @@ -32,6 +32,10 @@ import org.apache.avro.Schema; import org.apache.avro.Schema.Type; +import org.apache.avro.SchemaCompatibility; +import org.apache.avro.SchemaCompatibility.SchemaCompatibilityType; +import org.apache.avro.SchemaCompatibility.SchemaPairCompatibility; +import org.apache.avro.UnresolvedUnionException; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericData.Fixed; import org.apache.avro.generic.GenericDatumReader; @@ -41,7 +45,6 @@ import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.EncoderFactory; -import org.apache.avro.UnresolvedUnionException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.common.type.HiveChar; @@ -76,6 +79,18 @@ * record encoding. */ private static boolean warnedOnce = false; + + /** + * Whether to check reader and writer schema for compatibility + * */ + private boolean schemaCompatibilityCheck = false; + + AvroDeserializer() {} + + AvroDeserializer(boolean schemaCompatibilityCheck) { + this.schemaCompatibilityCheck = schemaCompatibilityCheck; + } + /** * When encountering a record with an older schema than the one we're trying * to read, it is necessary to re-encode with a reader against the newer schema. @@ -119,19 +134,19 @@ public GenericRecord reencode(GenericRecord r) private List row; /** - * Deserialize an Avro record, recursing into its component fields and - * deserializing them as well. Fields of the record are matched by name - * against fields in the Hive row. + * Deserialize an Avro record, recursing into its component fields and deserializing them as well. + * Fields of the record are matched by name against fields in the Hive row. * - * Because Avro has some data types that Hive does not, these are converted - * during deserialization to types Hive will work with. + * Because Avro has some data types that Hive does not, these are converted during deserialization + * to types Hive will work with. * * @param columnNames List of columns Hive is expecting from record. * @param columnTypes List of column types matched by index to names * @param writable Instance of GenericAvroWritable to deserialize * @param readerSchema Schema of the writable to deserialize * @return A list of objects suitable for Hive to work with further - * @throws AvroSerdeException For any exception during deseriliazation + * @throws AvroSerdeException For any exception during deseriliazation or if reader/writer schemas + * are incompatible */ public Object deserialize(List columnNames, List columnTypes, Writable writable, Schema readerSchema) throws AvroSerdeException { @@ -149,16 +164,33 @@ public Object deserialize(List columnNames, List columnTypes, GenericRecord r = recordWritable.getRecord(); Schema fileSchema = recordWritable.getFileSchema(); - UID recordReaderId = recordWritable.getRecordReaderID(); - //If the record reader (from which the record is originated) is already seen and valid, - //no need to re-encode the record. + UID recordReaderId = recordWritable.getRecordReaderID(); + // If the record reader (from which the record is originated) is already seen and valid, + // no need to re-encode the record. if(!noEncodingNeeded.contains(recordReaderId)) { SchemaReEncoder reEncoder = null; - //Check if the record record is already encoded once. If it does - //reuse the encoder. + // Check if the record is already encoded once. If it does + // reuse the encoder. if(reEncoderCache.containsKey(recordReaderId)) { reEncoder = reEncoderCache.get(recordReaderId); //Reuse the re-encoder } else if (!r.getSchema().equals(readerSchema)) { //Evolved schema? + + if (schemaCompatibilityCheck) { + LOG.debug("Attempting to do a compatiblity check"); + // Check that the evolved schema is compatible + SchemaPairCompatibility schemaCompatibility = + SchemaCompatibility.checkReaderWriterCompatibility(readerSchema, r.getSchema()); + + if (schemaCompatibility.getType().equals(SchemaCompatibilityType.INCOMPATIBLE)) { + // Schemas are incompatible, Fail! + throw new AvroSerdeException("Incompatible reader/writer schemas encountered. Description: "+schemaCompatibility.getDescription()); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Output of schema compatibility check: ["+schemaCompatibility.getType()+"]"); + } + } + //Create and store new encoder in the map for re-use reEncoder = new SchemaReEncoder(r.getSchema(), readerSchema); reEncoderCache.put(recordReaderId, reEncoder); diff --git serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java index efff663..36936ee 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java +++ serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java @@ -68,6 +68,7 @@ private List columnNames; private List columnTypes; private Schema schema; + private boolean checkSchemaCompatibility = false; private AvroDeserializer avroDeserializer = null; private AvroSerializer avroSerializer = null; @@ -110,6 +111,8 @@ public void initialize(Configuration configuration, Properties properties) throw properties.setProperty(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), schema.toString()); } + checkSchemaCompatibility = Boolean.valueOf(AvroSerdeUtils.AvroTableProperties.SCHEMA_COMPATIBILITY_CHECK.getPropName()); + LOG.info("Avro schema is " + schema); if (configuration == null) { @@ -214,7 +217,7 @@ public SerDeStats getSerDeStats() { private AvroDeserializer getDeserializer() { if(avroDeserializer == null) { - avroDeserializer = new AvroDeserializer(); + avroDeserializer = new AvroDeserializer(checkSchemaCompatibility); } return avroDeserializer; diff --git serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java index 4edf654..9390e16 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java +++ serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java @@ -61,7 +61,8 @@ SCHEMA_NAME("avro.schema.name"), SCHEMA_DOC("avro.schema.doc"), AVRO_SERDE_SCHEMA("avro.serde.schema"), - SCHEMA_RETRIEVER("avro.schema.retriever"); + SCHEMA_RETRIEVER("avro.schema.retriever"), + SCHEMA_COMPATIBILITY_CHECK("avro.schema.compatibility.check"); private final String propName; diff --git serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java index 986b803..6537bb2 100644 --- serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java +++ serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java @@ -31,6 +31,7 @@ import java.util.Map; import org.apache.avro.Schema; +import org.apache.avro.SchemaCompatibility; import org.apache.avro.generic.GenericData; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -194,7 +195,7 @@ public void canDeserializeRecordsInternal(Schema s, Schema fileSchema) throws Se record.put("aRecord", innerRecord); assertTrue(GENERIC_DATA.validate(s, record)); - AvroGenericRecordWritable garw = Utils.serializeAndDeserializeRecord(record, fileSchema); + AvroGenericRecordWritable garw = Utils.serializeAndDeserializeRecord(record, s); AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(s); @@ -230,12 +231,66 @@ public void canDeserializeRecordsInternal(Schema s, Schema fileSchema) throws Se assertEquals(42432234234l, innerRecord2OI.getStructFieldData(innerRecord2, allStructFieldRefs1.get(2))); } + public void canDeserializeRecordsInternal(Schema s, Schema fileSchema, AvroGenericRecordWritable garw, boolean validate) throws SerDeException, IOException { + GenericData.Record record = new GenericData.Record(s); + GenericData.Record innerRecord = new GenericData.Record(s.getField("aRecord").schema()); + innerRecord.put("int1", 42); + innerRecord.put("boolean1", true); + innerRecord.put("long1", 42432234234l); + record.put("aRecord", innerRecord); + assertTrue(GENERIC_DATA.validate(s, record)); + + AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(s); + + AvroDeserializer de = new AvroDeserializer(validate); + ArrayList row = + (ArrayList)de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), garw, s); + assertEquals(1, row.size()); + Object theRecordObject = row.get(0); + System.out.println("theRecordObject = " + theRecordObject.getClass().getCanonicalName()); + + // The original record was lost in the deserialization, so just go the + // correct way, through objectinspectors + StandardStructObjectInspector oi = (StandardStructObjectInspector)aoig.getObjectInspector(); + List allStructFieldRefs = oi.getAllStructFieldRefs(); + assertEquals(1, allStructFieldRefs.size()); + StructField fieldRefForaRecord = allStructFieldRefs.get(0); + assertEquals("arecord", fieldRefForaRecord.getFieldName()); + Object innerRecord2 = oi.getStructFieldData(row, fieldRefForaRecord); + + // Extract innerRecord field refs + StandardStructObjectInspector innerRecord2OI = + (StandardStructObjectInspector) fieldRefForaRecord.getFieldObjectInspector(); + + List allStructFieldRefs1 = innerRecord2OI.getAllStructFieldRefs(); + assertEquals(3, allStructFieldRefs1.size()); + assertEquals("int1", allStructFieldRefs1.get(0).getFieldName()); + assertEquals("boolean1", allStructFieldRefs1.get(1).getFieldName()); + assertEquals("long1", allStructFieldRefs1.get(2).getFieldName()); + + innerRecord2OI.getStructFieldsDataAsList(innerRecord2); + assertEquals(42, innerRecord2OI.getStructFieldData(innerRecord2, allStructFieldRefs1.get(0))); + assertEquals(true, innerRecord2OI.getStructFieldData(innerRecord2, allStructFieldRefs1.get(1))); + assertEquals(42432234234l, innerRecord2OI.getStructFieldData(innerRecord2, allStructFieldRefs1.get(2))); + } + @Test public void canDeserializeRecords() throws SerDeException, IOException { Schema s = AvroSerdeUtils.getSchemaFor(TestAvroObjectInspectorGenerator.RECORD_SCHEMA); canDeserializeRecordsInternal(s, s); } + @Test(expected=AvroSerdeException.class) + public void cannotDeserializeRecordsWithBadEvolvedSchema() throws SerDeException, IOException { + Schema readerSchema = AvroSerdeUtils.getSchemaFor(TestAvroObjectInspectorGenerator.RECORD_SCHEMA); + Schema fileSchema = AvroSerdeUtils.getSchemaFor(TestAvroObjectInspectorGenerator.RECORD_SCHEMA_BAD_EVOLUTION); + + GenericData.Record record = new GenericData.Record(fileSchema); + AvroGenericRecordWritable garw = new AvroGenericRecordWritable(record); + + canDeserializeRecordsInternal(readerSchema, fileSchema, garw, true); + } + @Test public void canDeserializeNullableRecords() throws SerDeException, IOException { Schema s = AvroSerdeUtils.getSchemaFor(TestAvroObjectInspectorGenerator.RECORD_SCHEMA); diff --git serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroObjectInspectorGenerator.java serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroObjectInspectorGenerator.java index 3736a1f..088c4c3 100644 --- serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroObjectInspectorGenerator.java +++ serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroObjectInspectorGenerator.java @@ -100,6 +100,32 @@ " }\n" + " ]\n" + "}"; + public static final String RECORD_SCHEMA_BAD_EVOLUTION = "{\n" + + " \"namespace\": \"testing.test.mctesty\",\n" + + " \"name\": \"twoRecord\",\n" + + " \"type\": \"record\",\n" + + " \"fields\": [\n" + + " {\n" + + " \"name\":\"aRecord\",\n" + + " \"type\":{\"type\":\"record\",\n" + + " \"name\":\"recordWithinARecord\",\n" + + " \"fields\": [\n" + + " {\n" + + " \"name\":\"int1\",\n" + + " \"type\":\"int\"\n" + + " },\n" + + " {\n" + + " \"name\":\"boolean1\",\n" + + " \"type\":\"boolean\"\n" + + " },\n" + + " {\n" + + " \"name\":\"long1\",\n" + + " \"type\":\"long\"\n" + + " }\n" + + " ]}\n" + + " }\n" + + " ]\n" + + "}"; public static final String NULLABLE_RECORD_SCHEMA = "[\"null\", " + RECORD_SCHEMA + "]"; public static final String UNION_SCHEMA = "{\n" + " \"namespace\": \"test.a.rossa\",\n" +