Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-13148

Kraft Controller doesn't handle scheduleAppend returning Long.MAX_VALUE

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: controller, kraft
    • Labels:

      Description

      In some cases the RaftClient will return Long.MAX_VALUE:

            /**
             * Append a list of records to the log. The write will be scheduled for some time
             * in the future. There is no guarantee that appended records will be written to
             * the log and eventually committed. However, it is guaranteed that if any of the
             * records become committed, then all of them will be.
             *
             * If the provided current leader epoch does not match the current epoch, which
             * is possible when the state machine has yet to observe the epoch change, then
             * this method will return {@link Long#MAX_VALUE} to indicate an offset which is
             * not possible to become committed. The state machine is expected to discard all
             * uncommitted entries after observing an epoch change.
             *
             * @param epoch the current leader epoch
             * @param records the list of records to append
             * @return the expected offset of the last record; {@link Long#MAX_VALUE} if the records could
             *         be committed; null if no memory could be allocated for the batch at this time
             * @throws org.apache.kafka.common.errors.RecordBatchTooLargeException if the size of the records is greater than the maximum
             *         batch size; if this exception is throw none of the elements in records were
             *         committed
             */
            Long scheduleAtomicAppend(int epoch, List<T> records);
       

      The controller doesn't handle this case:

                        // If the operation returned a batch of records, those records need to be
                        // written before we can return our result to the user.  Here, we hand off
                        // the batch of records to the raft client.  They will be written out
                        // asynchronously.
                        final long offset;
                        if (result.isAtomic()) {
                            offset = raftClient.scheduleAtomicAppend(controllerEpoch, result.records());
                        } else {
                            offset = raftClient.scheduleAppend(controllerEpoch, result.records());
                        }
                        op.processBatchEndOffset(offset);
                        writeOffset = offset;
                        resultAndOffset = ControllerResultAndOffset.of(offset, result);
                        for (ApiMessageAndVersion message : result.records()) {
                            replay(message.message(), Optional.empty(), offset);
                        }
                        snapshotRegistry.getOrCreateSnapshot(offset);
                        log.debug("Read-write operation {} will be completed when the log " +
                            "reaches offset {}.", this, resultAndOffset.offset());
       

       

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                Niket Goel Niket Goel
                Reporter:
                jagsancio Jose Armando Garcia Sancio
              • Votes:
                0 Vote for this issue
                Watchers:
                3 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: