Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-13694

Some InvalidRecordException messages are thrown away

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.0.0
    • 3.2.0
    • core
    • None

    Description

      1.Example

      Topic level config:"cleanup.policy":"compact" 

      But when the producer sends the message, the ProducerRecord does not specify the key.

       

      producer.log

      [kafka-producer-network-thread | producer-1] ERROR us.zoom.mq.server.adapter.kafka.ProducerTest - the producer has a error:One or more records have been rejected 

       

       

      server.log

      [2022-02-25 02:14:54,411] ERROR [ReplicaManager broker=1] Error processing append operation on partition rivenTest4-0 (kafka.server.ReplicaManager)
      org.apache.kafka.common.InvalidRecordException: One or more records have been rejected 

      Through the logs of the producer and server, we do not know the reason for the failure of sending, only that the message was rejected by the server.
      You can compare the RecordTooLargeException testCase, we can clearly know the reason for the failure from the producer, and the server will not print the log (the reason will be explained later)
      producer_message_too_large.log :

      [kafka-producer-network-thread | producer-1] ERROR us.zoom.mq.server.adapter.kafka.ProducerTest - the producer has a error:The request included a message larger than the max message size the server will accept.
      [kafka-producer-network-thread | producer-1] ERROR us.zoom.mq.server.adapter.kafka.ProducerTest - the producer has a error:The request included a message larger than the max message size the server will accept. 

      2.RootCause

      ReplicaManager#appendToLocalLog(...) ->

      Partition#appendRecordsToLeader(...) ->

      UnifiedLog#appendAsLeader(...) -> UnifiedLog#append(...) ->

      LogValidator#validateMessagesAndAssignOffsets(...) 

      1) Analyze the validateMessagesAndAssignOffsets method,
      In the LogValidator#validateRecord method, validateKey and validateTimestamp are called, and the error information of all messages is obtained: Seq[ApiRecordError];
      In the subsequent processRecordErrors(recordErrors) method, currently only special processing is done for Errors.INVALID_TIMESTAMP, because the ERROR returned by validateKey is still the ordinary Errors.INVALID_RECORD, so the code will run to

      else {
        throw new RecordValidationException(new InvalidRecordException(
          "One or more records have been rejected"), errors)
      }

      In fact, the errors variable here contains the specific information of each recordError, but we did not put the errors information into the message of InvalidRecordException.

      2).The exception thrown by processRecordErrors will be caught by ReplicaManager#appendToLocalLog(...), we continue to analyze the `catchException code` of appendToLocalLog.

      Here, we can know the RecordTooLargeException, why the server does not print the log.

      Under case rve: RecordValidationException,
      The server prints the log: processFailedRecord method,
      and sends a response to the client: LogAppendResult method
      In these two methods, we can find that we only use rve.invalidException,
      For rve.recordErrors, the server neither prints it nor returns it to the client.

      3.Solution
      Two solutions, I prefer the second

      1)Similar to Errors.INVALID_TIMESTAMP, the validateKey method returns Errors.INVALID_RECORD_WITHOUT_KEY,
      In the processRecordErrors method, also do special processing for Errors.INVALID_RECORD_WITHOUT_KEY

      2)Modify the logic of the processRecordErrors method, no longer distinguish the types of Errors, and Even if new INVALID_RECORD types will be added in the future, we uniformly return:

      throw new RecordValidationException(new InvalidRecordException(
        "One or more records have been rejected due to " + errors.toString()), errors) 

      Also need to add toString() method for ProduceResponse.RecordError class

      @Override
      public String toString() {
          return "RecordError("
                  + "batchIndex=" + batchIndex
                  + ", message=" + ((message == null) ? "null" : "'" + message + "'")
                  + ")";
      } 

      In the past, the toString method of ProduceResponse.PartitionResponse has called the toString method of ProduceResponse.RecordError, but before we were missing the RecordError#toString method.

       

      Attachments

        Issue Links

          Activity

            People

              RivenSun RivenSun
              RivenSun RivenSun
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: