From a6df2651eb2d964affb38525f8461fd57c95a611 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Wed, 4 Mar 2015 10:37:48 -0800 Subject: [PATCH] KAFKA-1863.v1 --- .../apache/kafka/clients/producer/Callback.java | 19 +++++++++++++++++++ .../kafka/clients/producer/internals/Sender.java | 1 + .../apache/kafka/common/protocol/types/Field.java | 5 +++++ .../apache/kafka/common/protocol/types/Schema.java | 5 +++++ .../common/protocol/types/SchemaException.java | 3 +++ .../apache/kafka/common/protocol/types/Struct.java | 10 ++++++++++ .../apache/kafka/common/protocol/types/Type.java | 22 ++++++++++++++++++++-- 7 files changed, 63 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Callback.java b/clients/src/main/java/org/apache/kafka/clients/producer/Callback.java index b89aa58..7caefc3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/Callback.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/Callback.java @@ -29,6 +29,25 @@ public interface Callback { * @param metadata The metadata for the record that was sent (i.e. the partition and offset). Null if an error * occurred. * @param exception The exception thrown during processing of this record. Null if no error occurred. + * Possible thrown exceptions include: + * + * Non-Retriable exceptions (fatal, the message will never be sent): + * + * InvalidTopicException + * OffsetMetadataTooLargeException + * RecordBatchTooLargeException + * RecordTooLargeException + * UnknownServerException + * + * Retriable exceptions (transient, may be covered by increasing #.retries): + * + * CorruptRecordException + * InvalidMetadataException + * NotEnoughReplicasAfterAppendException + * NotEnoughReplicasException + * OffsetOutOfRangeException + * TimeoutException + * UnknownTopicOrPartitionException */ public void onCompletion(RecordMetadata metadata, Exception exception); } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index ed9c63a..7eb9cb5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -136,6 +136,7 @@ public class Sender implements Runnable { try { run(time.milliseconds()); } catch (Exception e) { + // any SchemaException will also be caught here log.error("Uncaught error in kafka producer I/O thread: ", e); } } diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java index 8991958..1eb1195 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java @@ -30,6 +30,11 @@ public class Field { public final String doc; final Schema schema; + /** + * Create the field. + * + * @throws SchemaException If the default value is not primitive and the validation fails + */ public Field(int index, String name, Type type, String doc, Object defaultValue, Schema schema) { this.index = index; this.name = name; diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java index 7164701..cfbe4b4 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java @@ -24,6 +24,11 @@ public class Schema extends Type { private final Field[] fields; private final Map fieldsByName; + /** + * Construct the schema with the given a list of its field values + * + * @throws SchemaException If the given list have duplicate fields + */ public Schema(Field... fs) { this.fields = new Field[fs.length]; this.fieldsByName = new HashMap(); diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/SchemaException.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/SchemaException.java index ea4e46f..58b685b 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/SchemaException.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/SchemaException.java @@ -18,6 +18,9 @@ package org.apache.kafka.common.protocol.types; import org.apache.kafka.common.KafkaException; +/** + * Thrown if the protocol schema validation fails while parsing request or response. + */ public class SchemaException extends KafkaException { private static final long serialVersionUID = 1L; diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java index ff89f0e..7672a3a 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java @@ -60,6 +60,7 @@ public class Struct { * * @param field The field to look up * @return The value for that field. + * @throws SchemaException if the field has no value and has no default. */ public Object get(Field field) { validateField(field); @@ -71,6 +72,7 @@ public class Struct { * * @param name The name of the field * @return The value in the record + * @throws SchemaException If no such field exists */ public Object get(String name) { Field field = schema.get(name); @@ -149,6 +151,7 @@ public class Struct { * * @param field The field * @param value The value + * @throws SchemaException If the validation of the field failed */ public Struct set(Field field, Object value) { validateField(field); @@ -161,6 +164,7 @@ public class Struct { * * @param name The name of the field * @param value The value to set + * @throws SchemaException If the field is not known */ public Struct set(String name, Object value) { Field field = this.schema.get(name); @@ -177,6 +181,7 @@ public class Struct { * * @param field The field to create an instance of * @return The struct + * @throws SchemaException If the given field is not a container type */ public Struct instance(Field field) { validateField(field); @@ -195,6 +200,7 @@ public class Struct { * * @param field The name of the field to create (field must be a schema type) * @return The struct + * @throws SchemaException If the given field is not a container type */ public Struct instance(String field) { return instance(schema.get(field)); @@ -223,6 +229,8 @@ public class Struct { /** * Ensure the user doesn't try to access fields from the wrong schema + * + * @throws SchemaException If validation fails */ private void validateField(Field field) { if (this.schema != field.schema) @@ -233,6 +241,8 @@ public class Struct { /** * Validate the contents of this struct against its schema + * + * @throws SchemaException If validation fails */ public void validate() { this.schema.validate(this); diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java index f0d5a82..9ea28b2 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java @@ -25,14 +25,32 @@ import org.apache.kafka.common.utils.Utils; */ public abstract class Type { + /** + * Write the typed object to the buffer + * + * @throws SchemaException If the object is not valid for its type + */ public abstract void write(ByteBuffer buffer, Object o); + /** + * Read the typed object from the buffer + * + * @throws SchemaException If the object is not valid for its type + */ public abstract Object read(ByteBuffer buffer); - public abstract int sizeOf(Object o); - + /** + * Validate the object. If succeeded return its typed object. + * + * @throws SchemaException If validation failed + */ public abstract Object validate(Object o); + /** + * Return the size of the object in bytes + */ + public abstract int sizeOf(Object o); + public static final Type INT8 = new Type() { @Override public void write(ByteBuffer buffer, Object o) { -- 1.7.12.4