diff --git kafka-handler/README.md kafka-handler/README.md
index 753e3e3..b93a1c4 100644
--- kafka-handler/README.md
+++ kafka-handler/README.md
@@ -50,6 +50,9 @@ ALTER TABLE
SET TBLPROPERTIES (
"kafka.serde.class" = "org.apache.hadoop.hive.serde2.avro.AvroSerDe");
```
+
+If you use Confluent Avro serialzier/deserializer with schema registry you may want to remove 5 bytes from beginning that represents magic byte + schema ID from registry.
+It can be done by setting `"avro.serde.type"="confluent"` or `"avro.serde.type"="skip"` with `"avro.serde.skip.bytes"="5"`. It's recommended to set an avro schema via `"avro.schema.url"="http://hostname/SimpleDocument.avsc"` or `"avro.schema.literal"="{"type" : "record","name" : "SimpleRecord","..."}`. If both properties are set then `avro.schema.literal` has higher priority.
List of supported serializers and deserializers:
diff --git kafka-handler/pom.xml kafka-handler/pom.xml
index 6ad41de..a65187f 100644
--- kafka-handler/pom.xml
+++ kafka-handler/pom.xml
@@ -118,8 +118,21 @@
1.7.30
test
+
+ io.confluent
+ kafka-avro-serializer
+ 5.4.0
+ test
+
+
+
+ confluent
+ http://packages.confluent.io/maven/
+
+
+
dev-fast-build
@@ -179,6 +192,10 @@
${basedir}/src/test
+ org.apache.avro
+ avro-maven-plugin
+
+
org.apache.maven.plugins
maven-jar-plugin
@@ -190,5 +207,27 @@
+
+
+
+ org.apache.avro
+ avro-maven-plugin
+ 1.8.1
+
+
+ generate-sources
+
+ schema
+
+
+
+
+ ${project.basedir}/src/resources/
+ true
+ String
+
+
+
+
diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java
index ffe7788..3db2b75 100644
--- kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java
+++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java
@@ -25,6 +25,7 @@
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
@@ -133,12 +134,40 @@
Preconditions.checkArgument(!schemaFromProperty.isEmpty(), "Avro Schema is empty Can not go further");
Schema schema = AvroSerdeUtils.getSchemaFor(schemaFromProperty);
LOG.debug("Building Avro Reader with schema {}", schemaFromProperty);
- bytesConverter = new AvroBytesConverter(schema);
+ bytesConverter = getByteConverterForAvroDelegate(schema, tbl);
} else {
bytesConverter = new BytesWritableConverter();
}
}
+ enum BytesConverterType {
+ CONFLUENT,
+ SKIP,
+ NONE;
+
+ static BytesConverterType fromString(String value) {
+ try {
+ return BytesConverterType.valueOf(value.trim().toUpperCase());
+ } catch (Exception e){
+ return NONE;
+ }
+ }
+ }
+
+ BytesConverter getByteConverterForAvroDelegate(Schema schema, Properties tbl) {
+ String avroBytesConverterProperty = tbl.getProperty(AvroSerdeUtils
+ .AvroTableProperties.AVRO_SERDE_TYPE
+ .getPropName(), BytesConverterType.NONE.toString());
+ BytesConverterType avroByteConverterType = BytesConverterType.fromString(avroBytesConverterProperty);
+ Integer avroSkipBytes = Integer.getInteger(tbl.getProperty(AvroSerdeUtils.AvroTableProperties.AVRO_SERDE_SKIP_BYTES
+ .getPropName()));
+ switch (avroByteConverterType) {
+ case CONFLUENT: return new AvroSkipBytesConverter(schema, 5);
+ case SKIP: return new AvroSkipBytesConverter(schema, avroSkipBytes);
+ default: return new AvroBytesConverter(schema);
+ }
+ }
+
@Override public Class extends Writable> getSerializedClass() {
return delegateSerDe.getSerializedClass();
}
@@ -327,7 +356,7 @@ private SubStructObjectInspector(StructObjectInspector baseOI, int toIndex) {
K getWritable(byte[] value);
}
- private static class AvroBytesConverter implements BytesConverter {
+ static class AvroBytesConverter implements BytesConverter {
private final Schema schema;
private final DatumReader dataReader;
private final GenericDatumWriter gdw = new GenericDatumWriter<>();
@@ -336,7 +365,7 @@ private SubStructObjectInspector(StructObjectInspector baseOI, int toIndex) {
AvroBytesConverter(Schema schema) {
this.schema = schema;
- dataReader = new SpecificDatumReader<>(this.schema);
+ this.dataReader = new SpecificDatumReader<>(this.schema);
}
@Override public byte[] getBytes(AvroGenericRecordWritable writable) {
@@ -354,10 +383,14 @@ private SubStructObjectInspector(StructObjectInspector baseOI, int toIndex) {
return valueBytes;
}
+ Decoder getDecoder(byte[] value) {
+ return DecoderFactory.get().binaryDecoder(value, null);
+ }
+
@Override public AvroGenericRecordWritable getWritable(byte[] value) {
GenericRecord avroRecord = null;
try {
- avroRecord = dataReader.read(null, DecoderFactory.get().binaryDecoder(value, null));
+ avroRecord = dataReader.read(null, getDecoder(value));
} catch (IOException e) {
Throwables.propagate(new SerDeException(e));
}
@@ -369,6 +402,26 @@ private SubStructObjectInspector(StructObjectInspector baseOI, int toIndex) {
}
}
+ /**
+ * The converter reads bytes from kafka message and skip first @skipBytes from beginning.
+ *
+ * For example:
+ * Confluent kafka producer add 5 magic bytes that represents Schema ID as Integer to the message.
+ */
+ static class AvroSkipBytesConverter extends AvroBytesConverter {
+ private final int skipBytes;
+
+ AvroSkipBytesConverter(Schema schema, int skipBytes) {
+ super(schema);
+ this.skipBytes = skipBytes;
+ }
+
+ @Override
+ Decoder getDecoder(byte[] value) {
+ return DecoderFactory.get().binaryDecoder(value, this.skipBytes, value.length - this.skipBytes, null);
+ }
+ }
+
private static class BytesWritableConverter implements BytesConverter {
@Override public byte[] getBytes(BytesWritable writable) {
return writable.getBytes();
diff --git kafka-handler/src/resources/SimpleRecord.avsc kafka-handler/src/resources/SimpleRecord.avsc
new file mode 100644
index 0000000..47b6156
--- /dev/null
+++ kafka-handler/src/resources/SimpleRecord.avsc
@@ -0,0 +1,13 @@
+{
+ "type" : "record",
+ "name" : "SimpleRecord",
+ "namespace" : "org.apache.hadoop.hive.kafka",
+ "fields" : [ {
+ "name" : "id",
+ "type" : "string"
+ }, {
+ "name" : "name",
+ "type" : "string"
+ }
+ ]
+}
diff --git kafka-handler/src/test/org/apache/hadoop/hive/kafka/AvroBytesConverterTest.java kafka-handler/src/test/org/apache/hadoop/hive/kafka/AvroBytesConverterTest.java
new file mode 100644
index 0000000..6728dd6
--- /dev/null
+++ kafka-handler/src/test/org/apache/hadoop/hive/kafka/AvroBytesConverterTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import com.google.common.collect.Maps;
+import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
+import io.confluent.kafka.serializers.KafkaAvroSerializer;
+import org.apache.avro.Schema;
+import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Map;
+
+/**
+ * Test class for Hive Kafka Avro bytes converter.
+ */
+public class AvroBytesConverterTest {
+ private static SimpleRecord simpleRecord1 = SimpleRecord.newBuilder().setId("123").setName("test").build();
+ private static byte[] simpleRecord1AsBytes;
+
+ /**
+ * Emulate confluent avro producer that add 4 magic bits (int) before value bytes. The int represents the schema ID from schema registry.
+ */
+ @BeforeClass
+ public static void setUp() {
+ Map config = Maps.newHashMap();
+ config.put("schema.registry.url", "http://localhost");
+ KafkaAvroSerializer avroSerializer = new KafkaAvroSerializer(new MockSchemaRegistryClient());
+ avroSerializer.configure(config, false);
+ simpleRecord1AsBytes = avroSerializer.serialize("temp", simpleRecord1);
+ }
+
+ /**
+ * Emulate - avro.serde.type = none (Default).
+ */
+ @Test
+ public void convertWithAvroBytesConverter() {
+ Schema schema = SimpleRecord.getClassSchema();
+ KafkaSerDe.AvroBytesConverter conv = new KafkaSerDe.AvroBytesConverter(schema);
+ AvroGenericRecordWritable simpleRecord1Writable = conv.getWritable(simpleRecord1AsBytes);
+
+ Assert.assertNotNull(simpleRecord1Writable);
+ Assert.assertEquals(SimpleRecord.class, simpleRecord1Writable.getRecord().getClass());
+
+ SimpleRecord simpleRecord1Deserialized = (SimpleRecord) simpleRecord1Writable.getRecord();
+
+ Assert.assertNotNull(simpleRecord1Deserialized);
+ Assert.assertNotEquals(simpleRecord1, simpleRecord1Deserialized);
+ }
+
+ /**
+ * Emulate - avro.serde.type = confluent.
+ */
+ @Test
+ public void convertWithConfluentAvroBytesConverter() {
+ Schema schema = SimpleRecord.getClassSchema();
+ KafkaSerDe.AvroSkipBytesConverter conv = new KafkaSerDe.AvroSkipBytesConverter(schema, 5);
+ AvroGenericRecordWritable simpleRecord1Writable = conv.getWritable(simpleRecord1AsBytes);
+
+ Assert.assertNotNull(simpleRecord1Writable);
+ Assert.assertEquals(SimpleRecord.class, simpleRecord1Writable.getRecord().getClass());
+
+ SimpleRecord simpleRecord1Deserialized = (SimpleRecord) simpleRecord1Writable.getRecord();
+
+ Assert.assertNotNull(simpleRecord1Deserialized);
+ Assert.assertEquals(simpleRecord1, simpleRecord1Deserialized);
+ }
+
+ /**
+ * Emulate - avro.serde.type = skip.
+ */
+ @Test
+ public void convertWithCustomAvroSkipBytesConverter() {
+ int offset = 2;
+ byte[] simpleRecordAsOffsetBytes = Arrays.copyOfRange(simpleRecord1AsBytes, 5 - offset, simpleRecord1AsBytes.length);
+
+ Schema schema = SimpleRecord.getClassSchema();
+ KafkaSerDe.AvroSkipBytesConverter conv = new KafkaSerDe.AvroSkipBytesConverter(schema, offset);
+ AvroGenericRecordWritable simpleRecord1Writable = conv.getWritable(simpleRecordAsOffsetBytes);
+
+ Assert.assertNotNull(simpleRecord1Writable);
+ Assert.assertEquals(SimpleRecord.class, simpleRecord1Writable.getRecord().getClass());
+
+ SimpleRecord simpleRecord1Deserialized = (SimpleRecord) simpleRecord1Writable.getRecord();
+
+ Assert.assertNotNull(simpleRecord1Deserialized);
+ Assert.assertEquals(simpleRecord1, simpleRecord1Deserialized);
+ }
+
+ @Test
+ public void enumParseTest() {
+ Assert.assertEquals(KafkaSerDe.BytesConverterType.CONFLUENT,
+ KafkaSerDe.BytesConverterType.fromString("confluent"));
+ Assert.assertEquals(KafkaSerDe.BytesConverterType.CONFLUENT,
+ KafkaSerDe.BytesConverterType.fromString("conFLuent"));
+ Assert.assertEquals(KafkaSerDe.BytesConverterType.CONFLUENT,
+ KafkaSerDe.BytesConverterType.fromString("Confluent"));
+ Assert.assertEquals(KafkaSerDe.BytesConverterType.CONFLUENT,
+ KafkaSerDe.BytesConverterType.fromString("CONFLUENT"));
+ Assert.assertEquals(KafkaSerDe.BytesConverterType.SKIP, KafkaSerDe.BytesConverterType.fromString("skip"));
+ Assert.assertEquals(KafkaSerDe.BytesConverterType.SKIP, KafkaSerDe.BytesConverterType.fromString("sKIp"));
+ Assert.assertEquals(KafkaSerDe.BytesConverterType.SKIP, KafkaSerDe.BytesConverterType.fromString("SKIP"));
+ Assert.assertEquals(KafkaSerDe.BytesConverterType.NONE, KafkaSerDe.BytesConverterType.fromString("SKIP1"));
+ Assert.assertEquals(KafkaSerDe.BytesConverterType.NONE, KafkaSerDe.BytesConverterType.fromString("skipper"));
+ Assert.assertEquals(KafkaSerDe.BytesConverterType.NONE, KafkaSerDe.BytesConverterType.fromString(""));
+ Assert.assertEquals(KafkaSerDe.BytesConverterType.NONE, KafkaSerDe.BytesConverterType.fromString(null));
+ }
+}
diff --git serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java
index d16abdb..b540073 100644
--- serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java
+++ serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java
@@ -68,6 +68,8 @@
SCHEMA_NAME("avro.schema.name"),
SCHEMA_DOC("avro.schema.doc"),
AVRO_SERDE_SCHEMA("avro.serde.schema"),
+ AVRO_SERDE_TYPE("avro.serde.type"),
+ AVRO_SERDE_SKIP_BYTES("avro.serde.skip.bytes"),
SCHEMA_RETRIEVER("avro.schema.retriever");
private final String propName;