diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 6961494..42396b5 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -470,29 +470,33 @@ class WorkerSinkTask extends WorkerTask {
     private void convertMessages(ConsumerRecords<byte[], byte[]> msgs) {
         origOffsets.clear();
         for (ConsumerRecord<byte[], byte[]> msg : msgs) {
-            log.trace("{} Consuming and converting message in topic '{}' partition {} at offset {} and timestamp {}",
-                    this, msg.topic(), msg.partition(), msg.offset(), msg.timestamp());
-            SchemaAndValue keyAndSchema = keyConverter.toConnectData(msg.topic(), msg.key());
-            SchemaAndValue valueAndSchema = valueConverter.toConnectData(msg.topic(), msg.value());
-            Long timestamp = ConnectUtils.checkAndConvertTimestamp(msg.timestamp());
-            SinkRecord origRecord = new SinkRecord(msg.topic(), msg.partition(),
-                    keyAndSchema.schema(), keyAndSchema.value(),
-                    valueAndSchema.schema(), valueAndSchema.value(),
-                    msg.offset(),
-                    timestamp,
-                    msg.timestampType());
-            log.trace("{} Applying transformations to record in topic '{}' partition {} at offset {} and timestamp {} with key {} and value {}",
-                    this, msg.topic(), msg.partition(), msg.offset(), timestamp, keyAndSchema.value(), valueAndSchema.value());
-            SinkRecord transRecord = transformationChain.apply(origRecord);
-            origOffsets.put(
-                    new TopicPartition(origRecord.topic(), origRecord.kafkaPartition()),
-                    new OffsetAndMetadata(origRecord.kafkaOffset() + 1)
-            );
-            if (transRecord != null) {
-                messageBatch.add(transRecord);
-            } else {
-                log.trace("{} Transformations returned null, so dropping record in topic '{}' partition {} at offset {} and timestamp {} with key {} and value {}",
+            try {
+                log.trace("{} Consuming and converting message in topic '{}' partition {} at offset {} and timestamp {}",
+                        this, msg.topic(), msg.partition(), msg.offset(), msg.timestamp());
+                SchemaAndValue keyAndSchema = keyConverter.toConnectData(msg.topic(), msg.key());
+                SchemaAndValue valueAndSchema = valueConverter.toConnectData(msg.topic(), msg.value());
+                Long timestamp = ConnectUtils.checkAndConvertTimestamp(msg.timestamp());
+                SinkRecord origRecord = new SinkRecord(msg.topic(), msg.partition(),
+                        keyAndSchema.schema(), keyAndSchema.value(),
+                        valueAndSchema.schema(), valueAndSchema.value(),
+                        msg.offset(),
+                        timestamp,
+                        msg.timestampType());
+                log.trace("{} Applying transformations to record in topic '{}' partition {} at offset {} and timestamp {} with key {} and value {}",
                         this, msg.topic(), msg.partition(), msg.offset(), timestamp, keyAndSchema.value(), valueAndSchema.value());
+                SinkRecord transRecord = transformationChain.apply(origRecord);
+                origOffsets.put(
+                        new TopicPartition(origRecord.topic(), origRecord.kafkaPartition()),
+                        new OffsetAndMetadata(origRecord.kafkaOffset() + 1)
+                );
+                if (transRecord != null) {
+                    messageBatch.add(transRecord);
+                } else {
+                    log.trace("{} Transformations returned null, so dropping record in topic '{}' partition {} at offset {} and timestamp {} with key {} and value {}",
+                            this, msg.topic(), msg.partition(), msg.offset(), timestamp, keyAndSchema.value(), valueAndSchema.value());
+                }
+            } catch(DataException e) {
+                log.warn("Error converting message: {}", e.getMessage());
             }
         }
         sinkTaskMetricsGroup.recordConsumedOffsets(origOffsets);
