Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-5773

ConnectRecord.equals() doesn't properly handle array keys/values

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Minor
    • Resolution: Unresolved
    • 0.9.0.1, 0.11.0.0
    • None
    • connect
    • None

    Description

      ConnectRecord.equals() isn't handling comparison properly when the key or value is an array (a byte array, for instance). The following test will fail because ConnectRecord is using .equals() to compare two byte arrays, which is doing an identity check instead of comparing the arrays themselves.

      import org.apache.kafka.connect.data.Schema;
      import org.apache.kafka.connect.source.SourceRecord;
      import org.junit.Test;
      
      import java.util.Collections;
      
      import static org.assertj.core.api.Assertions.assertThat;
      
      public class SourceRecordTest {
          @Test
          public void testEquals() {
              byte[] firstBytes = "first".getBytes();
              byte[] secondBytes = "first".getBytes();
      
              SourceRecord firstRecord = new SourceRecord(Collections.EMPTY_MAP,
                      Collections.EMPTY_MAP,"topic", 1, Schema.BYTES_SCHEMA, firstBytes);
              SourceRecord secondRecord = new SourceRecord(Collections.EMPTY_MAP,
                      Collections.EMPTY_MAP,"topic", 1, Schema.BYTES_SCHEMA, firstBytes);
              SourceRecord thirdRecord = new SourceRecord(Collections.EMPTY_MAP,
                      Collections.EMPTY_MAP,"topic", 1, Schema.BYTES_SCHEMA, secondBytes);
      
              assertThat(firstRecord).isEqualTo(secondRecord);
              assertThat(firstRecord).isEqualTo(thirdRecord);
          }
      }
      

      As a result, I have a failing unit test that should otherwise pass:

                  List<SourceRecord> sourceRecords = task.poll();
                  final SourceRecord expectedRecord = new SourceRecord(
                          ImmutableMap.of(JdbcEventSourceTask.PARTITION_DATABASE, JDBC_URL),
                          ImmutableMap.of(JdbcEventSourceTask.OFFSET_ID, publishedId),
                          "topicname",
                          1,
                          Schema.BYTES_SCHEMA, "MessageKey".getBytes(),
                          Schema.BYTES_SCHEMA, "MessageValue".getBytes());
                  assertThat(sourceRecords).containsOnly(expectedRecord);
      

      The workaround at the moment is to implement a custom Comparator<SourceRecord> instance to use with assertj. But I wonder if there's anything in Kafka Connect itself that may be affected by this issue.

      The code causing the issue can be seen [here](https://github.com/apache/kafka/blob/0.11.0.0/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java#L108-L109) (keys) and [here](https://github.com/apache/kafka/blob/0.11.0.0/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java#L112-L113) (values). The comparison there should use (when arrays are present) java.util.Arrays.equals(byte[], byte[]) to perform an equality check instead of an identity check, assuming that's the desired behavior.

      Attachments

        Activity

          People

            Unassigned Unassigned
            rckenned Ryan Kennedy
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: