Index: kafka-handler/README.md IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- kafka-handler/README.md (revision 1327d47a5d325179420cc2de15f7535bc9d10040) +++ kafka-handler/README.md (revision ea94e4f9e4291e8b5cd6294c609d13db82cb46f8) @@ -25,6 +25,10 @@ ```sql ALTER TABLE kafka_table SET TBLPROPERTIES ("kafka.serde.class"="org.apache.hadoop.hive.serde2.avro.AvroSerDe"); ``` + +If you use Confluent Avro serialzier/deserializer with schema registry you may want to remove 5 bytes from beginning that represents magic byte + schema ID from registry. +It can be done by setting `"avro.serde.type"="confluent"` or `"avro.serde.type"="skip"` with `"avro.serde.skip.bytes"="5"`. It's recommended to set an avro schema via `"avro.schema.url"="http://hostname/SimpleDocument.avsc"` or `"avro.schema.literal"="{"type" : "record","name" : "SimpleRecord","..."}`. If both properties are set then `avro.schema.literal` has higher priority. + List of supported Serializer Deserializer: |Supported Serializer Deserializer| Index: kafka-handler/pom.xml IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- kafka-handler/pom.xml (revision 1327d47a5d325179420cc2de15f7535bc9d10040) +++ kafka-handler/pom.xml (revision ea94e4f9e4291e8b5cd6294c609d13db82cb46f8) @@ -114,8 +114,21 @@ 1.7.25 test + + io.confluent + kafka-streams-avro-serde + 5.1.0 + test + + + + confluent + http://packages.confluent.io/maven/ + + + dev-fast-build @@ -168,6 +181,10 @@ ${basedir}/src/java ${basedir}/src/test + + org.apache.avro + avro-maven-plugin + org.apache.maven.plugins maven-jar-plugin @@ -180,5 +197,27 @@ + + + + org.apache.avro + avro-maven-plugin + 1.8.1 + + + generate-sources + + schema + + + + + ${project.basedir}/src/resources/ + true + String + + + + \ No newline at end of file Index: kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java (revision 1327d47a5d325179420cc2de15f7535bc9d10040) +++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java (revision ea94e4f9e4291e8b5cd6294c609d13db82cb46f8) @@ -25,6 +25,7 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.DatumReader; +import org.apache.avro.io.Decoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.EncoderFactory; import org.apache.avro.specific.SpecificDatumReader; @@ -133,12 +134,24 @@ Preconditions.checkArgument(!schemaFromProperty.isEmpty(), "Avro Schema is empty Can not go further"); Schema schema = AvroSerdeUtils.getSchemaFor(schemaFromProperty); LOG.debug("Building Avro Reader with schema {}", schemaFromProperty); - bytesConverter = new AvroBytesConverter(schema); + bytesConverter = getByteConverterForAvroDelegate(schema, tbl); } else { bytesConverter = new BytesWritableConverter(); } } + BytesConverter getByteConverterForAvroDelegate(Schema schema, Properties tbl) { + String avroByteConverterType = tbl.getProperty(AvroSerdeUtils.AvroTableProperties.AVRO_SERDE_TYPE + .getPropName(), "none"); + int avroSkipBytes = Integer.getInteger(tbl.getProperty(AvroSerdeUtils.AvroTableProperties.AVRO_SERDE_SKIP_BYTES + .getPropName(), "5")); + switch ( avroByteConverterType ) { + case "confluent" : return new AvroSkipBytesConverter(schema, 5); + case "skip" : return new AvroSkipBytesConverter(schema, avroSkipBytes); + default : return new AvroBytesConverter(schema); + } + } + @Override public Class getSerializedClass() { return delegateSerDe.getSerializedClass(); } @@ -327,7 +340,7 @@ K getWritable(byte[] value); } - private static class AvroBytesConverter implements BytesConverter { + static class AvroBytesConverter implements BytesConverter { private final Schema schema; private final DatumReader dataReader; private final GenericDatumWriter gdw = new GenericDatumWriter<>(); @@ -336,7 +349,7 @@ AvroBytesConverter(Schema schema) { this.schema = schema; - dataReader = new SpecificDatumReader<>(this.schema); + this.dataReader = new SpecificDatumReader<>(this.schema); } @Override public byte[] getBytes(AvroGenericRecordWritable writable) { @@ -354,10 +367,14 @@ return valueBytes; } + Decoder getDecoder(byte[] value) { + return DecoderFactory.get().binaryDecoder(value, null); + } + @Override public AvroGenericRecordWritable getWritable(byte[] value) { GenericRecord avroRecord = null; try { - avroRecord = dataReader.read(null, DecoderFactory.get().binaryDecoder(value, null)); + avroRecord = dataReader.read(null, getDecoder(value)); } catch (IOException e) { Throwables.propagate(new SerDeException(e)); } @@ -369,6 +386,26 @@ } } + /** + * The converter reads bytes from kafka message and skip first @skipBytes from beginning. + * + * For example: + * Confluent kafka producer add 5 magic bytes that represents Schema ID as Integer to the message. + */ + static class AvroSkipBytesConverter extends AvroBytesConverter { + private final int skipBytes; + + AvroSkipBytesConverter(Schema schema, int skipBytes) { + super(schema); + this.skipBytes = skipBytes; + } + + @Override + Decoder getDecoder(byte[] value) { + return DecoderFactory.get().binaryDecoder(value, this.skipBytes, value.length - this.skipBytes, null); + } + } + private static class BytesWritableConverter implements BytesConverter { @Override public byte[] getBytes(BytesWritable writable) { return writable.getBytes(); Index: kafka-handler/src/resources/SimpleRecord.avsc IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- kafka-handler/src/resources/SimpleRecord.avsc (revision 1d25507e3f7f74425cf80343909382d120eadc85) +++ kafka-handler/src/resources/SimpleRecord.avsc (revision 1d25507e3f7f74425cf80343909382d120eadc85) @@ -0,0 +1,13 @@ +{ + "type" : "record", + "name" : "SimpleRecord", + "namespace" : "org.apache.hadoop.hive.kafka", + "fields" : [ { + "name" : "id", + "type" : "string" + }, { + "name" : "name", + "type" : "string" + } + ] +} Index: kafka-handler/src/test/org/apache/hadoop/hive/kafka/AvroBytesConverterTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- kafka-handler/src/test/org/apache/hadoop/hive/kafka/AvroBytesConverterTest.java (revision ea94e4f9e4291e8b5cd6294c609d13db82cb46f8) +++ kafka-handler/src/test/org/apache/hadoop/hive/kafka/AvroBytesConverterTest.java (revision ea94e4f9e4291e8b5cd6294c609d13db82cb46f8) @@ -0,0 +1,70 @@ +package org.apache.hadoop.hive.kafka; + +import com.google.common.collect.Maps; +import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient; +import io.confluent.kafka.serializers.KafkaAvroSerializer; +import org.apache.avro.Schema; +import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.Map; + +/** + * Test class for Hive Kafka Avro bytes converter. + */ +public class AvroBytesConverterTest { + + private static SimpleRecord simpleRecord1 = SimpleRecord.newBuilder().setId("123").setName("test").build(); + private static byte[] simpleRecord1AsBytes; + + /** + * Emulate confluent avro producer that add 4 magic bits (int) before value bytes. The int represents the schema ID from schema registry. + */ + @BeforeClass + public static void setUp() { + Map config = Maps.newHashMap(); + config.put("schema.registry.url","http://localhost"); + KafkaAvroSerializer avroSerializer = new KafkaAvroSerializer(new MockSchemaRegistryClient()); + avroSerializer.configure(config, false); + simpleRecord1AsBytes = avroSerializer.serialize("temp", simpleRecord1); + } + + /** + * Emulate - avro.serde.type = none (Default) + */ + @Test + public void convertWithAvroBytesConverter() { + Schema schema = SimpleRecord.getClassSchema(); + KafkaSerDe.AvroBytesConverter conv = new KafkaSerDe.AvroBytesConverter(schema); + AvroGenericRecordWritable simpleRecord1Writable = conv.getWritable(simpleRecord1AsBytes); + + Assert.assertNotNull(simpleRecord1Writable); + Assert.assertEquals(SimpleRecord.class,simpleRecord1Writable.getRecord().getClass()); + + SimpleRecord simpleRecord1Deserialized = (SimpleRecord) simpleRecord1Writable.getRecord(); + + Assert.assertNotNull(simpleRecord1Deserialized); + Assert.assertNotEquals(simpleRecord1, simpleRecord1Deserialized); + } + + + /** + * Emulate - avro.serde.type = confluent + */ + @Test + public void convertWithConfluentAvroBytesConverter() { + Schema schema = SimpleRecord.getClassSchema(); + KafkaSerDe.AvroSkipBytesConverter conv = new KafkaSerDe.AvroSkipBytesConverter(schema, 5); + AvroGenericRecordWritable simpleRecord1Writable = conv.getWritable(simpleRecord1AsBytes); + + Assert.assertNotNull(simpleRecord1Writable); + Assert.assertEquals(SimpleRecord.class,simpleRecord1Writable.getRecord().getClass()); + + SimpleRecord simpleRecord1Deserialized = (SimpleRecord) simpleRecord1Writable.getRecord(); + + Assert.assertNotNull(simpleRecord1Deserialized); + Assert.assertEquals(simpleRecord1, simpleRecord1Deserialized); + } +} Index: serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java (revision 1327d47a5d325179420cc2de15f7535bc9d10040) +++ serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java (revision ea94e4f9e4291e8b5cd6294c609d13db82cb46f8) @@ -68,6 +68,8 @@ SCHEMA_NAME("avro.schema.name"), SCHEMA_DOC("avro.schema.doc"), AVRO_SERDE_SCHEMA("avro.serde.schema"), + AVRO_SERDE_TYPE("avro.serde.type"), + AVRO_SERDE_SKIP_BYTES("avro.serde.skip.bytes"), SCHEMA_RETRIEVER("avro.schema.retriever"); private final String propName;