Details
-
Bug
-
Status: Open
-
Minor
-
Resolution: Unresolved
-
0.9.0.1, 0.11.0.0
-
None
-
None
Description
ConnectRecord.equals() isn't handling comparison properly when the key or value is an array (a byte array, for instance). The following test will fail because ConnectRecord is using .equals() to compare two byte arrays, which is doing an identity check instead of comparing the arrays themselves.
import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.source.SourceRecord; import org.junit.Test; import java.util.Collections; import static org.assertj.core.api.Assertions.assertThat; public class SourceRecordTest { @Test public void testEquals() { byte[] firstBytes = "first".getBytes(); byte[] secondBytes = "first".getBytes(); SourceRecord firstRecord = new SourceRecord(Collections.EMPTY_MAP, Collections.EMPTY_MAP,"topic", 1, Schema.BYTES_SCHEMA, firstBytes); SourceRecord secondRecord = new SourceRecord(Collections.EMPTY_MAP, Collections.EMPTY_MAP,"topic", 1, Schema.BYTES_SCHEMA, firstBytes); SourceRecord thirdRecord = new SourceRecord(Collections.EMPTY_MAP, Collections.EMPTY_MAP,"topic", 1, Schema.BYTES_SCHEMA, secondBytes); assertThat(firstRecord).isEqualTo(secondRecord); assertThat(firstRecord).isEqualTo(thirdRecord); } }
As a result, I have a failing unit test that should otherwise pass:
List<SourceRecord> sourceRecords = task.poll(); final SourceRecord expectedRecord = new SourceRecord( ImmutableMap.of(JdbcEventSourceTask.PARTITION_DATABASE, JDBC_URL), ImmutableMap.of(JdbcEventSourceTask.OFFSET_ID, publishedId), "topicname", 1, Schema.BYTES_SCHEMA, "MessageKey".getBytes(), Schema.BYTES_SCHEMA, "MessageValue".getBytes()); assertThat(sourceRecords).containsOnly(expectedRecord);
The workaround at the moment is to implement a custom Comparator<SourceRecord> instance to use with assertj. But I wonder if there's anything in Kafka Connect itself that may be affected by this issue.
The code causing the issue can be seen [here](https://github.com/apache/kafka/blob/0.11.0.0/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java#L108-L109) (keys) and [here](https://github.com/apache/kafka/blob/0.11.0.0/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java#L112-L113) (values). The comparison there should use (when arrays are present) java.util.Arrays.equals(byte[], byte[]) to perform an equality check instead of an identity check, assuming that's the desired behavior.