diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java index 0c11a04..c94ac74 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java @@ -46,8 +46,14 @@ public class Schema extends Type { */ public Object read(ByteBuffer buffer) { Object[] objects = new Object[fields.length]; - for (int i = 0; i < fields.length; i++) - objects[i] = fields[i].type.read(buffer); + for (int i = 0; i < fields.length; i++) { + try { + objects[i] = fields[i].type.read(buffer); + } catch (Exception e) { + throw new SchemaException("Error reading field '" + fields[i].name + "': " + e.getMessage() == null ? e.getMessage() + : e.getClass().getName()); + } + } return new Struct(this, objects); } diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java index 24ac060..ffc3d25 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java @@ -4,7 +4,6 @@ import java.nio.ByteBuffer; import org.apache.kafka.common.utils.Utils; - /** * A serializable type */