diff --git kafka-handler/README.md kafka-handler/README.md index 753e3e3..e7761e3 100644 --- kafka-handler/README.md +++ kafka-handler/README.md @@ -50,6 +50,9 @@ ALTER TABLE SET TBLPROPERTIES ( "kafka.serde.class" = "org.apache.hadoop.hive.serde2.avro.AvroSerDe"); ``` + +If you use Confluent's Avro serialzier or deserializer with the Confluent Schema Registry, you will need to remove five bytes from the beginning of each message. These five bytes represent [a magic byte and a four-byte schema ID from the registry.](https://docs.confluent.io/current/schema-registry/serializer-formatter.html#wire-format) +This can be done by setting `"avro.serde.type"="skip"` and `"avro.serde.skip.bytes"="5"`. In this case it is also recommended to set the Avro schema either via `"avro.schema.url"="http://hostname/SimpleDocument.avsc"` or `"avro.schema.literal"="{"type" : "record","name" : "SimpleRecord","..."}`. If both properties are set then `avro.schema.literal` has higher priority. List of supported serializers and deserializers: diff --git kafka-handler/pom.xml kafka-handler/pom.xml index 6ad41de..4e58cb9 100644 --- kafka-handler/pom.xml +++ kafka-handler/pom.xml @@ -52,6 +52,11 @@ + org.apache.hive + hive-serde + ${project.version} + + com.google.guava guava @@ -60,9 +65,9 @@ hadoop-common - org.slf4j - slf4j-api - + org.slf4j + slf4j-api + @@ -80,6 +85,10 @@ kafka-clients ${kafka.version} + + org.apache.avro + avro + junit @@ -118,8 +127,27 @@ 1.7.30 test + + io.confluent + kafka-avro-serializer + 5.4.0 + test + + + org.apache.avro + avro + + + + + + confluent + http://packages.confluent.io/maven/ + + + dev-fast-build @@ -179,16 +207,44 @@ ${basedir}/src/test - org.apache.maven.plugins - maven-jar-plugin + org.apache.avro + avro-maven-plugin + generate-test-sources - test-jar + schema + + org.apache.maven.plugins + maven-jar-plugin + + + + + org.apache.avro + avro-maven-plugin + ${avro.version} + + + generate-test-sources + + schema + + + + + ${project.basedir}/src/test/resources + ${project.basedir}/src/test/gen + true + String + + + + diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java index ffe7788..ee20b49 100644 --- kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java +++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java @@ -25,6 +25,7 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.DatumReader; +import org.apache.avro.io.Decoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.EncoderFactory; import org.apache.avro.specific.SpecificDatumReader; @@ -133,12 +134,46 @@ Preconditions.checkArgument(!schemaFromProperty.isEmpty(), "Avro Schema is empty Can not go further"); Schema schema = AvroSerdeUtils.getSchemaFor(schemaFromProperty); LOG.debug("Building Avro Reader with schema {}", schemaFromProperty); - bytesConverter = new AvroBytesConverter(schema); + bytesConverter = getByteConverterForAvroDelegate(schema, tbl); } else { bytesConverter = new BytesWritableConverter(); } } + enum BytesConverterType { + SKIP, + NONE; + + static BytesConverterType fromString(String value) { + try { + return BytesConverterType.valueOf(value.trim().toUpperCase()); + } catch (Exception e){ + return NONE; + } + } + } + + BytesConverter getByteConverterForAvroDelegate(Schema schema, Properties tbl) throws SerDeException { + String avroBytesConverterPropertyName = AvroSerdeUtils.AvroTableProperties.AVRO_SERDE_TYPE.getPropName(); + String avroBytesConverterProperty = tbl.getProperty(avroBytesConverterPropertyName, + BytesConverterType.NONE.toString()); + BytesConverterType avroByteConverterType = BytesConverterType.fromString(avroBytesConverterProperty); + switch (avroByteConverterType) { + case SKIP: + String avroSkipBytesPropertyName = AvroSerdeUtils.AvroTableProperties.AVRO_SERDE_SKIP_BYTES.getPropName(); + Integer avroSkipBytes = 0; + try { + avroSkipBytes = Integer.parseInt(tbl.getProperty(avroSkipBytesPropertyName)); + } catch (NumberFormatException e) { + String message = "Value of " + avroSkipBytesPropertyName + " could not be parsed into an integer properly."; + throw new SerDeException(message, e); + } + return new AvroSkipBytesConverter(schema, avroSkipBytes); + case NONE: return new AvroBytesConverter(schema); + default: throw new SerDeException("Value of " + avroBytesConverterPropertyName + " was invalid."); + } + } + @Override public Class getSerializedClass() { return delegateSerDe.getSerializedClass(); } @@ -327,7 +362,7 @@ private SubStructObjectInspector(StructObjectInspector baseOI, int toIndex) { K getWritable(byte[] value); } - private static class AvroBytesConverter implements BytesConverter { + static class AvroBytesConverter implements BytesConverter { private final Schema schema; private final DatumReader dataReader; private final GenericDatumWriter gdw = new GenericDatumWriter<>(); @@ -354,12 +389,18 @@ private SubStructObjectInspector(StructObjectInspector baseOI, int toIndex) { return valueBytes; } + Decoder getDecoder(byte[] value) throws SerDeException { + return DecoderFactory.get().binaryDecoder(value, null); + } + @Override public AvroGenericRecordWritable getWritable(byte[] value) { GenericRecord avroRecord = null; try { - avroRecord = dataReader.read(null, DecoderFactory.get().binaryDecoder(value, null)); + avroRecord = dataReader.read(null, getDecoder(value)); } catch (IOException e) { Throwables.propagate(new SerDeException(e)); + } catch (SerDeException e) { + Throwables.propagate(e); } avroGenericRecordWritable.setRecord(avroRecord); @@ -369,6 +410,30 @@ private SubStructObjectInspector(StructObjectInspector baseOI, int toIndex) { } } + /** + * Avro converter which skips the first @skipBytes of each message. + * + * This may be needed for various serializers, such as the Confluent Avro serializer, which uses the first five + * bytes to indicate a magic byte, as well as a four byte schema ID. + */ + static class AvroSkipBytesConverter extends AvroBytesConverter { + private final int skipBytes; + + AvroSkipBytesConverter(Schema schema, int skipBytes) { + super(schema); + this.skipBytes = skipBytes; + } + + @Override + Decoder getDecoder(byte[] value) throws SerDeException { + try { + return DecoderFactory.get().binaryDecoder(value, this.skipBytes, value.length - this.skipBytes, null); + } catch (ArrayIndexOutOfBoundsException e) { + throw new SerDeException("Skip bytes value is larger than the message length.", e); + } + } + } + private static class BytesWritableConverter implements BytesConverter { @Override public byte[] getBytes(BytesWritable writable) { return writable.getBytes(); diff --git kafka-handler/src/test/gen/org/apache/hadoop/hive/kafka/SimpleRecord.java kafka-handler/src/test/gen/org/apache/hadoop/hive/kafka/SimpleRecord.java new file mode 100644 index 0000000..112ee71 --- /dev/null +++ kafka-handler/src/test/gen/org/apache/hadoop/hive/kafka/SimpleRecord.java @@ -0,0 +1,308 @@ +/** + * Autogenerated by Avro + * + * DO NOT EDIT DIRECTLY + */ +package org.apache.hadoop.hive.kafka; + +import org.apache.avro.specific.SpecificData; +import org.apache.avro.message.BinaryMessageEncoder; +import org.apache.avro.message.BinaryMessageDecoder; +import org.apache.avro.message.SchemaStore; + +@SuppressWarnings("all") +@org.apache.avro.specific.AvroGenerated +public class SimpleRecord extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { + private static final long serialVersionUID = 5535943992250633953L; + public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"SimpleRecord\",\"namespace\":\"org.apache.hadoop.hive.kafka\",\"fields\":[{\"name\":\"id\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"name\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]}"); + public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } + + private static SpecificData MODEL$ = new SpecificData(); + + private static final BinaryMessageEncoder ENCODER = + new BinaryMessageEncoder(MODEL$, SCHEMA$); + + private static final BinaryMessageDecoder DECODER = + new BinaryMessageDecoder(MODEL$, SCHEMA$); + + /** + * Return the BinaryMessageDecoder instance used by this class. + */ + public static BinaryMessageDecoder getDecoder() { + return DECODER; + } + + /** + * Create a new BinaryMessageDecoder instance for this class that uses the specified {@link SchemaStore}. + * @param resolver a {@link SchemaStore} used to find schemas by fingerprint + */ + public static BinaryMessageDecoder createDecoder(SchemaStore resolver) { + return new BinaryMessageDecoder(MODEL$, SCHEMA$, resolver); + } + + /** Serializes this SimpleRecord to a ByteBuffer. */ + public java.nio.ByteBuffer toByteBuffer() throws java.io.IOException { + return ENCODER.encode(this); + } + + /** Deserializes a SimpleRecord from a ByteBuffer. */ + public static SimpleRecord fromByteBuffer( + java.nio.ByteBuffer b) throws java.io.IOException { + return DECODER.decode(b); + } + + @Deprecated public java.lang.String id; + @Deprecated public java.lang.String name; + + /** + * Default constructor. Note that this does not initialize fields + * to their default values from the schema. If that is desired then + * one should use newBuilder(). + */ + public SimpleRecord() {} + + /** + * All-args constructor. + * @param id The new value for id + * @param name The new value for name + */ + public SimpleRecord(java.lang.String id, java.lang.String name) { + this.id = id; + this.name = name; + } + + public org.apache.avro.Schema getSchema() { return SCHEMA$; } + // Used by DatumWriter. Applications should not call. + public java.lang.Object get(int field$) { + switch (field$) { + case 0: return id; + case 1: return name; + default: throw new org.apache.avro.AvroRuntimeException("Bad index"); + } + } + + // Used by DatumReader. Applications should not call. + @SuppressWarnings(value="unchecked") + public void put(int field$, java.lang.Object value$) { + switch (field$) { + case 0: id = (java.lang.String)value$; break; + case 1: name = (java.lang.String)value$; break; + default: throw new org.apache.avro.AvroRuntimeException("Bad index"); + } + } + + /** + * Gets the value of the 'id' field. + * @return The value of the 'id' field. + */ + public java.lang.String getId() { + return id; + } + + /** + * Sets the value of the 'id' field. + * @param value the value to set. + */ + public void setId(java.lang.String value) { + this.id = value; + } + + /** + * Gets the value of the 'name' field. + * @return The value of the 'name' field. + */ + public java.lang.String getName() { + return name; + } + + /** + * Sets the value of the 'name' field. + * @param value the value to set. + */ + public void setName(java.lang.String value) { + this.name = value; + } + + /** + * Creates a new SimpleRecord RecordBuilder. + * @return A new SimpleRecord RecordBuilder + */ + public static org.apache.hadoop.hive.kafka.SimpleRecord.Builder newBuilder() { + return new org.apache.hadoop.hive.kafka.SimpleRecord.Builder(); + } + + /** + * Creates a new SimpleRecord RecordBuilder by copying an existing Builder. + * @param other The existing builder to copy. + * @return A new SimpleRecord RecordBuilder + */ + public static org.apache.hadoop.hive.kafka.SimpleRecord.Builder newBuilder(org.apache.hadoop.hive.kafka.SimpleRecord.Builder other) { + return new org.apache.hadoop.hive.kafka.SimpleRecord.Builder(other); + } + + /** + * Creates a new SimpleRecord RecordBuilder by copying an existing SimpleRecord instance. + * @param other The existing instance to copy. + * @return A new SimpleRecord RecordBuilder + */ + public static org.apache.hadoop.hive.kafka.SimpleRecord.Builder newBuilder(org.apache.hadoop.hive.kafka.SimpleRecord other) { + return new org.apache.hadoop.hive.kafka.SimpleRecord.Builder(other); + } + + /** + * RecordBuilder for SimpleRecord instances. + */ + public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase + implements org.apache.avro.data.RecordBuilder { + + private java.lang.String id; + private java.lang.String name; + + /** Creates a new Builder */ + private Builder() { + super(SCHEMA$); + } + + /** + * Creates a Builder by copying an existing Builder. + * @param other The existing Builder to copy. + */ + private Builder(org.apache.hadoop.hive.kafka.SimpleRecord.Builder other) { + super(other); + if (isValidValue(fields()[0], other.id)) { + this.id = data().deepCopy(fields()[0].schema(), other.id); + fieldSetFlags()[0] = true; + } + if (isValidValue(fields()[1], other.name)) { + this.name = data().deepCopy(fields()[1].schema(), other.name); + fieldSetFlags()[1] = true; + } + } + + /** + * Creates a Builder by copying an existing SimpleRecord instance + * @param other The existing instance to copy. + */ + private Builder(org.apache.hadoop.hive.kafka.SimpleRecord other) { + super(SCHEMA$); + if (isValidValue(fields()[0], other.id)) { + this.id = data().deepCopy(fields()[0].schema(), other.id); + fieldSetFlags()[0] = true; + } + if (isValidValue(fields()[1], other.name)) { + this.name = data().deepCopy(fields()[1].schema(), other.name); + fieldSetFlags()[1] = true; + } + } + + /** + * Gets the value of the 'id' field. + * @return The value. + */ + public java.lang.String getId() { + return id; + } + + /** + * Sets the value of the 'id' field. + * @param value The value of 'id'. + * @return This builder. + */ + public org.apache.hadoop.hive.kafka.SimpleRecord.Builder setId(java.lang.String value) { + validate(fields()[0], value); + this.id = value; + fieldSetFlags()[0] = true; + return this; + } + + /** + * Checks whether the 'id' field has been set. + * @return True if the 'id' field has been set, false otherwise. + */ + public boolean hasId() { + return fieldSetFlags()[0]; + } + + + /** + * Clears the value of the 'id' field. + * @return This builder. + */ + public org.apache.hadoop.hive.kafka.SimpleRecord.Builder clearId() { + id = null; + fieldSetFlags()[0] = false; + return this; + } + + /** + * Gets the value of the 'name' field. + * @return The value. + */ + public java.lang.String getName() { + return name; + } + + /** + * Sets the value of the 'name' field. + * @param value The value of 'name'. + * @return This builder. + */ + public org.apache.hadoop.hive.kafka.SimpleRecord.Builder setName(java.lang.String value) { + validate(fields()[1], value); + this.name = value; + fieldSetFlags()[1] = true; + return this; + } + + /** + * Checks whether the 'name' field has been set. + * @return True if the 'name' field has been set, false otherwise. + */ + public boolean hasName() { + return fieldSetFlags()[1]; + } + + + /** + * Clears the value of the 'name' field. + * @return This builder. + */ + public org.apache.hadoop.hive.kafka.SimpleRecord.Builder clearName() { + name = null; + fieldSetFlags()[1] = false; + return this; + } + + @Override + @SuppressWarnings("unchecked") + public SimpleRecord build() { + try { + SimpleRecord record = new SimpleRecord(); + record.id = fieldSetFlags()[0] ? this.id : (java.lang.String) defaultValue(fields()[0]); + record.name = fieldSetFlags()[1] ? this.name : (java.lang.String) defaultValue(fields()[1]); + return record; + } catch (java.lang.Exception e) { + throw new org.apache.avro.AvroRuntimeException(e); + } + } + } + + @SuppressWarnings("unchecked") + private static final org.apache.avro.io.DatumWriter + WRITER$ = (org.apache.avro.io.DatumWriter)MODEL$.createDatumWriter(SCHEMA$); + + @Override public void writeExternal(java.io.ObjectOutput out) + throws java.io.IOException { + WRITER$.write(this, SpecificData.getEncoder(out)); + } + + @SuppressWarnings("unchecked") + private static final org.apache.avro.io.DatumReader + READER$ = (org.apache.avro.io.DatumReader)MODEL$.createDatumReader(SCHEMA$); + + @Override public void readExternal(java.io.ObjectInput in) + throws java.io.IOException { + READER$.read(this, SpecificData.getDecoder(in)); + } + +} diff --git kafka-handler/src/test/org/apache/hadoop/hive/kafka/AvroBytesConverterTest.java kafka-handler/src/test/org/apache/hadoop/hive/kafka/AvroBytesConverterTest.java new file mode 100644 index 0000000..da1108f --- /dev/null +++ kafka-handler/src/test/org/apache/hadoop/hive/kafka/AvroBytesConverterTest.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.kafka; + +import com.google.common.collect.Maps; + +import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient; +import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; +import io.confluent.kafka.serializers.KafkaAvroSerializer; + +import org.apache.avro.Schema; + +import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable; +import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils; +import org.apache.hadoop.hive.serde2.SerDeException; + +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +/** + * Test class for Hive Kafka Avro SerDe with variable bytes skipped. + */ +public class AvroBytesConverterTest { + private static SimpleRecord simpleRecord = SimpleRecord.newBuilder().setId("123").setName("test").build(); + private static byte[] simpleRecordConfluentBytes; + + @Rule + public ExpectedException exception = ExpectedException.none(); + + /** + * Use the KafkaAvroSerializer from Confluent to serialize the simpleRecord. + */ + @BeforeClass + public static void setUp() { + Map config = Maps.newHashMap(); + config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081"); + KafkaAvroSerializer avroSerializer = new KafkaAvroSerializer(new MockSchemaRegistryClient()); + avroSerializer.configure(config, false); + simpleRecordConfluentBytes = avroSerializer.serialize("temp", simpleRecord); + } + + private void runConversionTest(Properties tbl, byte[] serializedSimpleRecord) throws SerDeException { + KafkaSerDe serde = new KafkaSerDe(); + Schema schema = SimpleRecord.getClassSchema(); + KafkaSerDe.AvroBytesConverter conv = (KafkaSerDe.AvroBytesConverter)serde.getByteConverterForAvroDelegate( + schema, tbl); + AvroGenericRecordWritable simpleRecordWritable = conv.getWritable(serializedSimpleRecord); + + Assert.assertNotNull(simpleRecordWritable); + Assert.assertEquals(SimpleRecord.class, simpleRecordWritable.getRecord().getClass()); + + SimpleRecord simpleRecordDeserialized = (SimpleRecord) simpleRecordWritable.getRecord(); + + Assert.assertNotNull(simpleRecordDeserialized); + Assert.assertEquals(simpleRecord, simpleRecordDeserialized); + } + + /** + * Tests the default case of no skipped bytes per record works properly. + */ + @Test + public void convertWithAvroBytesConverter() throws SerDeException { + // Since the serialized version was created by Confluent, + // let's remove the first five bytes to get the actual message. + int recordLength = simpleRecordConfluentBytes.length; + byte[] simpleRecordWithNoOffset = Arrays.copyOfRange(simpleRecordConfluentBytes, 5, recordLength); + + Properties tbl = new Properties(); + tbl.setProperty(AvroSerdeUtils.AvroTableProperties.AVRO_SERDE_TYPE.getPropName(), "NONE"); + + runConversionTest(tbl, simpleRecordWithNoOffset); + } + + /** + * Tests that the skip converter skips 5 bytes properly, which matches what Confluent needs. + */ + @Test + public void convertWithConfluentAvroBytesConverter() throws SerDeException { + Integer offset = 5; + + Properties tbl = new Properties(); + tbl.setProperty(AvroSerdeUtils.AvroTableProperties.AVRO_SERDE_TYPE.getPropName(), "SKIP"); + tbl.setProperty(AvroSerdeUtils.AvroTableProperties.AVRO_SERDE_SKIP_BYTES.getPropName(), offset.toString()); + + runConversionTest(tbl, simpleRecordConfluentBytes); + } + + /** + * Tests that the skip converter skips a custom number of bytes properly. + */ + @Test + public void convertWithCustomAvroSkipBytesConverter() throws SerDeException { + Integer offset = 2; + // Remove all but two bytes of the five byte offset which Confluent adds, + // to simulate a message with only 2 bytes in front of each message. + int recordLength = simpleRecordConfluentBytes.length; + byte[] simpleRecordAsOffsetBytes = Arrays.copyOfRange(simpleRecordConfluentBytes, 5 - offset, recordLength); + + Properties tbl = new Properties(); + tbl.setProperty(AvroSerdeUtils.AvroTableProperties.AVRO_SERDE_TYPE.getPropName(), "SKIP"); tbl.setProperty(AvroSerdeUtils.AvroTableProperties.AVRO_SERDE_SKIP_BYTES.getPropName(), offset.toString()); + + runConversionTest(tbl, simpleRecordAsOffsetBytes); + } + + /** + * Test that when we skip more bytes than are in the message, we throw an exception properly. + */ + @Test + public void skipBytesLargerThanMessageSizeConverter() throws SerDeException { + // The simple record we are serializing is two strings, that combine to be 7 characters or 14 bytes. + // Adding in the 5 byte offset, we get 19 bytes. To make sure we go bigger than that, we are setting + // the offset to ten times that value. + Integer offset = 190; + + Properties tbl = new Properties(); + tbl.setProperty(AvroSerdeUtils.AvroTableProperties.AVRO_SERDE_TYPE.getPropName(), "SKIP"); + tbl.setProperty(AvroSerdeUtils.AvroTableProperties.AVRO_SERDE_SKIP_BYTES.getPropName(), offset.toString()); + + exception.expect(RuntimeException.class); + exception.expectMessage("org.apache.hadoop.hive.serde2.SerDeException: " + + "Skip bytes value is larger than the message length."); + runConversionTest(tbl, simpleRecordConfluentBytes); + } + + /** + * Test that we properly parse the converter type, no matter the casing. + */ + @Test + public void bytesConverterTypeParseTest() { + Map testCases = new HashMap() {{ + put("skip", KafkaSerDe.BytesConverterType.SKIP); + put("sKIp", KafkaSerDe.BytesConverterType.SKIP); + put("SKIP", KafkaSerDe.BytesConverterType.SKIP); + put(" skip ", KafkaSerDe.BytesConverterType.SKIP); + put("SKIP1", KafkaSerDe.BytesConverterType.NONE); + put("skipper", KafkaSerDe.BytesConverterType.NONE); + put("", KafkaSerDe.BytesConverterType.NONE); + put(null, KafkaSerDe.BytesConverterType.NONE); + put("none", KafkaSerDe.BytesConverterType.NONE); + put("NONE", KafkaSerDe.BytesConverterType.NONE); + }}; + + for(Map.Entry entry: testCases.entrySet()) { + Assert.assertEquals(entry.getValue(), KafkaSerDe.BytesConverterType.fromString(entry.getKey())); + } + } +} diff --git kafka-handler/src/test/resources/SimpleRecord.avsc kafka-handler/src/test/resources/SimpleRecord.avsc new file mode 100644 index 0000000..47b6156 --- /dev/null +++ kafka-handler/src/test/resources/SimpleRecord.avsc @@ -0,0 +1,13 @@ +{ + "type" : "record", + "name" : "SimpleRecord", + "namespace" : "org.apache.hadoop.hive.kafka", + "fields" : [ { + "name" : "id", + "type" : "string" + }, { + "name" : "name", + "type" : "string" + } + ] +} diff --git serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java index d16abdb..b540073 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java +++ serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java @@ -68,6 +68,8 @@ SCHEMA_NAME("avro.schema.name"), SCHEMA_DOC("avro.schema.doc"), AVRO_SERDE_SCHEMA("avro.serde.schema"), + AVRO_SERDE_TYPE("avro.serde.type"), + AVRO_SERDE_SKIP_BYTES("avro.serde.skip.bytes"), SCHEMA_RETRIEVER("avro.schema.retriever"); private final String propName;