diff --git kafka-handler/README.md kafka-handler/README.md
index 753e3e3..e7761e3 100644
--- kafka-handler/README.md
+++ kafka-handler/README.md
@@ -50,6 +50,9 @@ ALTER TABLE
SET TBLPROPERTIES (
"kafka.serde.class" = "org.apache.hadoop.hive.serde2.avro.AvroSerDe");
```
+
+If you use Confluent's Avro serialzier or deserializer with the Confluent Schema Registry, you will need to remove five bytes from the beginning of each message. These five bytes represent [a magic byte and a four-byte schema ID from the registry.](https://docs.confluent.io/current/schema-registry/serializer-formatter.html#wire-format)
+This can be done by setting `"avro.serde.type"="skip"` and `"avro.serde.skip.bytes"="5"`. In this case it is also recommended to set the Avro schema either via `"avro.schema.url"="http://hostname/SimpleDocument.avsc"` or `"avro.schema.literal"="{"type" : "record","name" : "SimpleRecord","..."}`. If both properties are set then `avro.schema.literal` has higher priority.
List of supported serializers and deserializers:
diff --git kafka-handler/pom.xml kafka-handler/pom.xml
index 6ad41de..4e58cb9 100644
--- kafka-handler/pom.xml
+++ kafka-handler/pom.xml
@@ -52,6 +52,11 @@
+ org.apache.hive
+ hive-serde
+ ${project.version}
+
+
com.google.guava
guava
@@ -60,9 +65,9 @@
hadoop-common
- org.slf4j
- slf4j-api
-
+ org.slf4j
+ slf4j-api
+
@@ -80,6 +85,10 @@
kafka-clients
${kafka.version}
+
+ org.apache.avro
+ avro
+
junit
@@ -118,8 +127,27 @@
1.7.30
test
+
+ io.confluent
+ kafka-avro-serializer
+ 5.4.0
+ test
+
+
+ org.apache.avro
+ avro
+
+
+
+
+
+ confluent
+ http://packages.confluent.io/maven/
+
+
+
dev-fast-build
@@ -179,16 +207,44 @@
${basedir}/src/test
- org.apache.maven.plugins
- maven-jar-plugin
+ org.apache.avro
+ avro-maven-plugin
+ generate-test-sources
- test-jar
+ schema
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+
+
+ org.apache.avro
+ avro-maven-plugin
+ ${avro.version}
+
+
+ generate-test-sources
+
+ schema
+
+
+
+
+ ${project.basedir}/src/test/resources
+ ${project.basedir}/src/test/gen
+ true
+ String
+
+
+
+
diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java
index ffe7788..ee20b49 100644
--- kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java
+++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java
@@ -25,6 +25,7 @@
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
@@ -133,12 +134,46 @@
Preconditions.checkArgument(!schemaFromProperty.isEmpty(), "Avro Schema is empty Can not go further");
Schema schema = AvroSerdeUtils.getSchemaFor(schemaFromProperty);
LOG.debug("Building Avro Reader with schema {}", schemaFromProperty);
- bytesConverter = new AvroBytesConverter(schema);
+ bytesConverter = getByteConverterForAvroDelegate(schema, tbl);
} else {
bytesConverter = new BytesWritableConverter();
}
}
+ enum BytesConverterType {
+ SKIP,
+ NONE;
+
+ static BytesConverterType fromString(String value) {
+ try {
+ return BytesConverterType.valueOf(value.trim().toUpperCase());
+ } catch (Exception e){
+ return NONE;
+ }
+ }
+ }
+
+ BytesConverter getByteConverterForAvroDelegate(Schema schema, Properties tbl) throws SerDeException {
+ String avroBytesConverterPropertyName = AvroSerdeUtils.AvroTableProperties.AVRO_SERDE_TYPE.getPropName();
+ String avroBytesConverterProperty = tbl.getProperty(avroBytesConverterPropertyName,
+ BytesConverterType.NONE.toString());
+ BytesConverterType avroByteConverterType = BytesConverterType.fromString(avroBytesConverterProperty);
+ switch (avroByteConverterType) {
+ case SKIP:
+ String avroSkipBytesPropertyName = AvroSerdeUtils.AvroTableProperties.AVRO_SERDE_SKIP_BYTES.getPropName();
+ Integer avroSkipBytes = 0;
+ try {
+ avroSkipBytes = Integer.parseInt(tbl.getProperty(avroSkipBytesPropertyName));
+ } catch (NumberFormatException e) {
+ String message = "Value of " + avroSkipBytesPropertyName + " could not be parsed into an integer properly.";
+ throw new SerDeException(message, e);
+ }
+ return new AvroSkipBytesConverter(schema, avroSkipBytes);
+ case NONE: return new AvroBytesConverter(schema);
+ default: throw new SerDeException("Value of " + avroBytesConverterPropertyName + " was invalid.");
+ }
+ }
+
@Override public Class extends Writable> getSerializedClass() {
return delegateSerDe.getSerializedClass();
}
@@ -327,7 +362,7 @@ private SubStructObjectInspector(StructObjectInspector baseOI, int toIndex) {
K getWritable(byte[] value);
}
- private static class AvroBytesConverter implements BytesConverter {
+ static class AvroBytesConverter implements BytesConverter {
private final Schema schema;
private final DatumReader dataReader;
private final GenericDatumWriter gdw = new GenericDatumWriter<>();
@@ -354,12 +389,18 @@ private SubStructObjectInspector(StructObjectInspector baseOI, int toIndex) {
return valueBytes;
}
+ Decoder getDecoder(byte[] value) throws SerDeException {
+ return DecoderFactory.get().binaryDecoder(value, null);
+ }
+
@Override public AvroGenericRecordWritable getWritable(byte[] value) {
GenericRecord avroRecord = null;
try {
- avroRecord = dataReader.read(null, DecoderFactory.get().binaryDecoder(value, null));
+ avroRecord = dataReader.read(null, getDecoder(value));
} catch (IOException e) {
Throwables.propagate(new SerDeException(e));
+ } catch (SerDeException e) {
+ Throwables.propagate(e);
}
avroGenericRecordWritable.setRecord(avroRecord);
@@ -369,6 +410,30 @@ private SubStructObjectInspector(StructObjectInspector baseOI, int toIndex) {
}
}
+ /**
+ * Avro converter which skips the first @skipBytes of each message.
+ *
+ * This may be needed for various serializers, such as the Confluent Avro serializer, which uses the first five
+ * bytes to indicate a magic byte, as well as a four byte schema ID.
+ */
+ static class AvroSkipBytesConverter extends AvroBytesConverter {
+ private final int skipBytes;
+
+ AvroSkipBytesConverter(Schema schema, int skipBytes) {
+ super(schema);
+ this.skipBytes = skipBytes;
+ }
+
+ @Override
+ Decoder getDecoder(byte[] value) throws SerDeException {
+ try {
+ return DecoderFactory.get().binaryDecoder(value, this.skipBytes, value.length - this.skipBytes, null);
+ } catch (ArrayIndexOutOfBoundsException e) {
+ throw new SerDeException("Skip bytes value is larger than the message length.", e);
+ }
+ }
+ }
+
private static class BytesWritableConverter implements BytesConverter {
@Override public byte[] getBytes(BytesWritable writable) {
return writable.getBytes();
diff --git kafka-handler/src/test/gen/org/apache/hadoop/hive/kafka/SimpleRecord.java kafka-handler/src/test/gen/org/apache/hadoop/hive/kafka/SimpleRecord.java
new file mode 100644
index 0000000..112ee71
--- /dev/null
+++ kafka-handler/src/test/gen/org/apache/hadoop/hive/kafka/SimpleRecord.java
@@ -0,0 +1,308 @@
+/**
+ * Autogenerated by Avro
+ *
+ * DO NOT EDIT DIRECTLY
+ */
+package org.apache.hadoop.hive.kafka;
+
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.message.BinaryMessageEncoder;
+import org.apache.avro.message.BinaryMessageDecoder;
+import org.apache.avro.message.SchemaStore;
+
+@SuppressWarnings("all")
+@org.apache.avro.specific.AvroGenerated
+public class SimpleRecord extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
+ private static final long serialVersionUID = 5535943992250633953L;
+ public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"SimpleRecord\",\"namespace\":\"org.apache.hadoop.hive.kafka\",\"fields\":[{\"name\":\"id\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"name\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]}");
+ public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
+
+ private static SpecificData MODEL$ = new SpecificData();
+
+ private static final BinaryMessageEncoder ENCODER =
+ new BinaryMessageEncoder(MODEL$, SCHEMA$);
+
+ private static final BinaryMessageDecoder DECODER =
+ new BinaryMessageDecoder(MODEL$, SCHEMA$);
+
+ /**
+ * Return the BinaryMessageDecoder instance used by this class.
+ */
+ public static BinaryMessageDecoder getDecoder() {
+ return DECODER;
+ }
+
+ /**
+ * Create a new BinaryMessageDecoder instance for this class that uses the specified {@link SchemaStore}.
+ * @param resolver a {@link SchemaStore} used to find schemas by fingerprint
+ */
+ public static BinaryMessageDecoder createDecoder(SchemaStore resolver) {
+ return new BinaryMessageDecoder(MODEL$, SCHEMA$, resolver);
+ }
+
+ /** Serializes this SimpleRecord to a ByteBuffer. */
+ public java.nio.ByteBuffer toByteBuffer() throws java.io.IOException {
+ return ENCODER.encode(this);
+ }
+
+ /** Deserializes a SimpleRecord from a ByteBuffer. */
+ public static SimpleRecord fromByteBuffer(
+ java.nio.ByteBuffer b) throws java.io.IOException {
+ return DECODER.decode(b);
+ }
+
+ @Deprecated public java.lang.String id;
+ @Deprecated public java.lang.String name;
+
+ /**
+ * Default constructor. Note that this does not initialize fields
+ * to their default values from the schema. If that is desired then
+ * one should use newBuilder().
+ */
+ public SimpleRecord() {}
+
+ /**
+ * All-args constructor.
+ * @param id The new value for id
+ * @param name The new value for name
+ */
+ public SimpleRecord(java.lang.String id, java.lang.String name) {
+ this.id = id;
+ this.name = name;
+ }
+
+ public org.apache.avro.Schema getSchema() { return SCHEMA$; }
+ // Used by DatumWriter. Applications should not call.
+ public java.lang.Object get(int field$) {
+ switch (field$) {
+ case 0: return id;
+ case 1: return name;
+ default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+ }
+ }
+
+ // Used by DatumReader. Applications should not call.
+ @SuppressWarnings(value="unchecked")
+ public void put(int field$, java.lang.Object value$) {
+ switch (field$) {
+ case 0: id = (java.lang.String)value$; break;
+ case 1: name = (java.lang.String)value$; break;
+ default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+ }
+ }
+
+ /**
+ * Gets the value of the 'id' field.
+ * @return The value of the 'id' field.
+ */
+ public java.lang.String getId() {
+ return id;
+ }
+
+ /**
+ * Sets the value of the 'id' field.
+ * @param value the value to set.
+ */
+ public void setId(java.lang.String value) {
+ this.id = value;
+ }
+
+ /**
+ * Gets the value of the 'name' field.
+ * @return The value of the 'name' field.
+ */
+ public java.lang.String getName() {
+ return name;
+ }
+
+ /**
+ * Sets the value of the 'name' field.
+ * @param value the value to set.
+ */
+ public void setName(java.lang.String value) {
+ this.name = value;
+ }
+
+ /**
+ * Creates a new SimpleRecord RecordBuilder.
+ * @return A new SimpleRecord RecordBuilder
+ */
+ public static org.apache.hadoop.hive.kafka.SimpleRecord.Builder newBuilder() {
+ return new org.apache.hadoop.hive.kafka.SimpleRecord.Builder();
+ }
+
+ /**
+ * Creates a new SimpleRecord RecordBuilder by copying an existing Builder.
+ * @param other The existing builder to copy.
+ * @return A new SimpleRecord RecordBuilder
+ */
+ public static org.apache.hadoop.hive.kafka.SimpleRecord.Builder newBuilder(org.apache.hadoop.hive.kafka.SimpleRecord.Builder other) {
+ return new org.apache.hadoop.hive.kafka.SimpleRecord.Builder(other);
+ }
+
+ /**
+ * Creates a new SimpleRecord RecordBuilder by copying an existing SimpleRecord instance.
+ * @param other The existing instance to copy.
+ * @return A new SimpleRecord RecordBuilder
+ */
+ public static org.apache.hadoop.hive.kafka.SimpleRecord.Builder newBuilder(org.apache.hadoop.hive.kafka.SimpleRecord other) {
+ return new org.apache.hadoop.hive.kafka.SimpleRecord.Builder(other);
+ }
+
+ /**
+ * RecordBuilder for SimpleRecord instances.
+ */
+ public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase
+ implements org.apache.avro.data.RecordBuilder {
+
+ private java.lang.String id;
+ private java.lang.String name;
+
+ /** Creates a new Builder */
+ private Builder() {
+ super(SCHEMA$);
+ }
+
+ /**
+ * Creates a Builder by copying an existing Builder.
+ * @param other The existing Builder to copy.
+ */
+ private Builder(org.apache.hadoop.hive.kafka.SimpleRecord.Builder other) {
+ super(other);
+ if (isValidValue(fields()[0], other.id)) {
+ this.id = data().deepCopy(fields()[0].schema(), other.id);
+ fieldSetFlags()[0] = true;
+ }
+ if (isValidValue(fields()[1], other.name)) {
+ this.name = data().deepCopy(fields()[1].schema(), other.name);
+ fieldSetFlags()[1] = true;
+ }
+ }
+
+ /**
+ * Creates a Builder by copying an existing SimpleRecord instance
+ * @param other The existing instance to copy.
+ */
+ private Builder(org.apache.hadoop.hive.kafka.SimpleRecord other) {
+ super(SCHEMA$);
+ if (isValidValue(fields()[0], other.id)) {
+ this.id = data().deepCopy(fields()[0].schema(), other.id);
+ fieldSetFlags()[0] = true;
+ }
+ if (isValidValue(fields()[1], other.name)) {
+ this.name = data().deepCopy(fields()[1].schema(), other.name);
+ fieldSetFlags()[1] = true;
+ }
+ }
+
+ /**
+ * Gets the value of the 'id' field.
+ * @return The value.
+ */
+ public java.lang.String getId() {
+ return id;
+ }
+
+ /**
+ * Sets the value of the 'id' field.
+ * @param value The value of 'id'.
+ * @return This builder.
+ */
+ public org.apache.hadoop.hive.kafka.SimpleRecord.Builder setId(java.lang.String value) {
+ validate(fields()[0], value);
+ this.id = value;
+ fieldSetFlags()[0] = true;
+ return this;
+ }
+
+ /**
+ * Checks whether the 'id' field has been set.
+ * @return True if the 'id' field has been set, false otherwise.
+ */
+ public boolean hasId() {
+ return fieldSetFlags()[0];
+ }
+
+
+ /**
+ * Clears the value of the 'id' field.
+ * @return This builder.
+ */
+ public org.apache.hadoop.hive.kafka.SimpleRecord.Builder clearId() {
+ id = null;
+ fieldSetFlags()[0] = false;
+ return this;
+ }
+
+ /**
+ * Gets the value of the 'name' field.
+ * @return The value.
+ */
+ public java.lang.String getName() {
+ return name;
+ }
+
+ /**
+ * Sets the value of the 'name' field.
+ * @param value The value of 'name'.
+ * @return This builder.
+ */
+ public org.apache.hadoop.hive.kafka.SimpleRecord.Builder setName(java.lang.String value) {
+ validate(fields()[1], value);
+ this.name = value;
+ fieldSetFlags()[1] = true;
+ return this;
+ }
+
+ /**
+ * Checks whether the 'name' field has been set.
+ * @return True if the 'name' field has been set, false otherwise.
+ */
+ public boolean hasName() {
+ return fieldSetFlags()[1];
+ }
+
+
+ /**
+ * Clears the value of the 'name' field.
+ * @return This builder.
+ */
+ public org.apache.hadoop.hive.kafka.SimpleRecord.Builder clearName() {
+ name = null;
+ fieldSetFlags()[1] = false;
+ return this;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public SimpleRecord build() {
+ try {
+ SimpleRecord record = new SimpleRecord();
+ record.id = fieldSetFlags()[0] ? this.id : (java.lang.String) defaultValue(fields()[0]);
+ record.name = fieldSetFlags()[1] ? this.name : (java.lang.String) defaultValue(fields()[1]);
+ return record;
+ } catch (java.lang.Exception e) {
+ throw new org.apache.avro.AvroRuntimeException(e);
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private static final org.apache.avro.io.DatumWriter
+ WRITER$ = (org.apache.avro.io.DatumWriter)MODEL$.createDatumWriter(SCHEMA$);
+
+ @Override public void writeExternal(java.io.ObjectOutput out)
+ throws java.io.IOException {
+ WRITER$.write(this, SpecificData.getEncoder(out));
+ }
+
+ @SuppressWarnings("unchecked")
+ private static final org.apache.avro.io.DatumReader
+ READER$ = (org.apache.avro.io.DatumReader)MODEL$.createDatumReader(SCHEMA$);
+
+ @Override public void readExternal(java.io.ObjectInput in)
+ throws java.io.IOException {
+ READER$.read(this, SpecificData.getDecoder(in));
+ }
+
+}
diff --git kafka-handler/src/test/org/apache/hadoop/hive/kafka/AvroBytesConverterTest.java kafka-handler/src/test/org/apache/hadoop/hive/kafka/AvroBytesConverterTest.java
new file mode 100644
index 0000000..da1108f
--- /dev/null
+++ kafka-handler/src/test/org/apache/hadoop/hive/kafka/AvroBytesConverterTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import com.google.common.collect.Maps;
+
+import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
+import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
+import io.confluent.kafka.serializers.KafkaAvroSerializer;
+
+import org.apache.avro.Schema;
+
+import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable;
+import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
+import org.apache.hadoop.hive.serde2.SerDeException;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Test class for Hive Kafka Avro SerDe with variable bytes skipped.
+ */
+public class AvroBytesConverterTest {
+ private static SimpleRecord simpleRecord = SimpleRecord.newBuilder().setId("123").setName("test").build();
+ private static byte[] simpleRecordConfluentBytes;
+
+ @Rule
+ public ExpectedException exception = ExpectedException.none();
+
+ /**
+ * Use the KafkaAvroSerializer from Confluent to serialize the simpleRecord.
+ */
+ @BeforeClass
+ public static void setUp() {
+ Map config = Maps.newHashMap();
+ config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
+ KafkaAvroSerializer avroSerializer = new KafkaAvroSerializer(new MockSchemaRegistryClient());
+ avroSerializer.configure(config, false);
+ simpleRecordConfluentBytes = avroSerializer.serialize("temp", simpleRecord);
+ }
+
+ private void runConversionTest(Properties tbl, byte[] serializedSimpleRecord) throws SerDeException {
+ KafkaSerDe serde = new KafkaSerDe();
+ Schema schema = SimpleRecord.getClassSchema();
+ KafkaSerDe.AvroBytesConverter conv = (KafkaSerDe.AvroBytesConverter)serde.getByteConverterForAvroDelegate(
+ schema, tbl);
+ AvroGenericRecordWritable simpleRecordWritable = conv.getWritable(serializedSimpleRecord);
+
+ Assert.assertNotNull(simpleRecordWritable);
+ Assert.assertEquals(SimpleRecord.class, simpleRecordWritable.getRecord().getClass());
+
+ SimpleRecord simpleRecordDeserialized = (SimpleRecord) simpleRecordWritable.getRecord();
+
+ Assert.assertNotNull(simpleRecordDeserialized);
+ Assert.assertEquals(simpleRecord, simpleRecordDeserialized);
+ }
+
+ /**
+ * Tests the default case of no skipped bytes per record works properly.
+ */
+ @Test
+ public void convertWithAvroBytesConverter() throws SerDeException {
+ // Since the serialized version was created by Confluent,
+ // let's remove the first five bytes to get the actual message.
+ int recordLength = simpleRecordConfluentBytes.length;
+ byte[] simpleRecordWithNoOffset = Arrays.copyOfRange(simpleRecordConfluentBytes, 5, recordLength);
+
+ Properties tbl = new Properties();
+ tbl.setProperty(AvroSerdeUtils.AvroTableProperties.AVRO_SERDE_TYPE.getPropName(), "NONE");
+
+ runConversionTest(tbl, simpleRecordWithNoOffset);
+ }
+
+ /**
+ * Tests that the skip converter skips 5 bytes properly, which matches what Confluent needs.
+ */
+ @Test
+ public void convertWithConfluentAvroBytesConverter() throws SerDeException {
+ Integer offset = 5;
+
+ Properties tbl = new Properties();
+ tbl.setProperty(AvroSerdeUtils.AvroTableProperties.AVRO_SERDE_TYPE.getPropName(), "SKIP");
+ tbl.setProperty(AvroSerdeUtils.AvroTableProperties.AVRO_SERDE_SKIP_BYTES.getPropName(), offset.toString());
+
+ runConversionTest(tbl, simpleRecordConfluentBytes);
+ }
+
+ /**
+ * Tests that the skip converter skips a custom number of bytes properly.
+ */
+ @Test
+ public void convertWithCustomAvroSkipBytesConverter() throws SerDeException {
+ Integer offset = 2;
+ // Remove all but two bytes of the five byte offset which Confluent adds,
+ // to simulate a message with only 2 bytes in front of each message.
+ int recordLength = simpleRecordConfluentBytes.length;
+ byte[] simpleRecordAsOffsetBytes = Arrays.copyOfRange(simpleRecordConfluentBytes, 5 - offset, recordLength);
+
+ Properties tbl = new Properties();
+ tbl.setProperty(AvroSerdeUtils.AvroTableProperties.AVRO_SERDE_TYPE.getPropName(), "SKIP"); tbl.setProperty(AvroSerdeUtils.AvroTableProperties.AVRO_SERDE_SKIP_BYTES.getPropName(), offset.toString());
+
+ runConversionTest(tbl, simpleRecordAsOffsetBytes);
+ }
+
+ /**
+ * Test that when we skip more bytes than are in the message, we throw an exception properly.
+ */
+ @Test
+ public void skipBytesLargerThanMessageSizeConverter() throws SerDeException {
+ // The simple record we are serializing is two strings, that combine to be 7 characters or 14 bytes.
+ // Adding in the 5 byte offset, we get 19 bytes. To make sure we go bigger than that, we are setting
+ // the offset to ten times that value.
+ Integer offset = 190;
+
+ Properties tbl = new Properties();
+ tbl.setProperty(AvroSerdeUtils.AvroTableProperties.AVRO_SERDE_TYPE.getPropName(), "SKIP");
+ tbl.setProperty(AvroSerdeUtils.AvroTableProperties.AVRO_SERDE_SKIP_BYTES.getPropName(), offset.toString());
+
+ exception.expect(RuntimeException.class);
+ exception.expectMessage("org.apache.hadoop.hive.serde2.SerDeException: " +
+ "Skip bytes value is larger than the message length.");
+ runConversionTest(tbl, simpleRecordConfluentBytes);
+ }
+
+ /**
+ * Test that we properly parse the converter type, no matter the casing.
+ */
+ @Test
+ public void bytesConverterTypeParseTest() {
+ Map testCases = new HashMap() {{
+ put("skip", KafkaSerDe.BytesConverterType.SKIP);
+ put("sKIp", KafkaSerDe.BytesConverterType.SKIP);
+ put("SKIP", KafkaSerDe.BytesConverterType.SKIP);
+ put(" skip ", KafkaSerDe.BytesConverterType.SKIP);
+ put("SKIP1", KafkaSerDe.BytesConverterType.NONE);
+ put("skipper", KafkaSerDe.BytesConverterType.NONE);
+ put("", KafkaSerDe.BytesConverterType.NONE);
+ put(null, KafkaSerDe.BytesConverterType.NONE);
+ put("none", KafkaSerDe.BytesConverterType.NONE);
+ put("NONE", KafkaSerDe.BytesConverterType.NONE);
+ }};
+
+ for(Map.Entry entry: testCases.entrySet()) {
+ Assert.assertEquals(entry.getValue(), KafkaSerDe.BytesConverterType.fromString(entry.getKey()));
+ }
+ }
+}
diff --git kafka-handler/src/test/resources/SimpleRecord.avsc kafka-handler/src/test/resources/SimpleRecord.avsc
new file mode 100644
index 0000000..47b6156
--- /dev/null
+++ kafka-handler/src/test/resources/SimpleRecord.avsc
@@ -0,0 +1,13 @@
+{
+ "type" : "record",
+ "name" : "SimpleRecord",
+ "namespace" : "org.apache.hadoop.hive.kafka",
+ "fields" : [ {
+ "name" : "id",
+ "type" : "string"
+ }, {
+ "name" : "name",
+ "type" : "string"
+ }
+ ]
+}
diff --git serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java
index d16abdb..b540073 100644
--- serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java
+++ serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java
@@ -68,6 +68,8 @@
SCHEMA_NAME("avro.schema.name"),
SCHEMA_DOC("avro.schema.doc"),
AVRO_SERDE_SCHEMA("avro.serde.schema"),
+ AVRO_SERDE_TYPE("avro.serde.type"),
+ AVRO_SERDE_SKIP_BYTES("avro.serde.skip.bytes"),
SCHEMA_RETRIEVER("avro.schema.retriever");
private final String propName;