Index: clients/src/main/java/org/apache/kafka/common/NewOffsetMetaData.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- clients/src/main/java/org/apache/kafka/common/NewOffsetMetaData.java (revision dfb2d3156fcbf445d8615aa194e518e09150c42f) +++ clients/src/main/java/org/apache/kafka/common/NewOffsetMetaData.java (revision dfb2d3156fcbf445d8615aa194e518e09150c42f) @@ -0,0 +1,31 @@ +package org.apache.kafka.common; + +/** + * leo: logEndOffset + * lso: logStartOffset + * lst: logStartTime + * let: logEndTime + */ +public class NewOffsetMetaData { + public final long leo; + public final int brokerid; + public final long let; + public final long lst; + public final long lso; + + public NewOffsetMetaData(int brokerid, long leo, long lst, long let, long lso) { + this.brokerid = brokerid; + this.leo = leo; + this.lst = lst; + this.let = let; + this.lso = lso; + } + + public String toString() { + java.text.SimpleDateFormat df = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + String lstTime = df.format(this.lst); + String letTime = df.format(this.let); + return "[broker=" + this.brokerid + "]->[lst=" + this.lst + "(" + lstTime + "),lso=" + this.lso + + ",let=" + this.let + "(" + letTime + "), leo=" + this.leo + "]"; + } +} \ No newline at end of file Index: clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java (revision 1f4c0944eafc4947f6a5cc426c3e195f40810d6d) +++ clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java (revision dfb2d3156fcbf445d8615aa194e518e09150c42f) @@ -40,7 +40,8 @@ SASL_HANDSHAKE(17, "SaslHandshake"), API_VERSIONS(18, "ApiVersions"), CREATE_TOPICS(19, "CreateTopics"), - DELETE_TOPICS(20, "DeleteTopics"); + DELETE_TOPICS(20, "DeleteTopics"), + GET_START_OFFSET(21, "GetStartOffset"); private static final ApiKeys[] ID_TO_TYPE; private static final int MIN_API_KEY = 0; Index: clients/src/main/java/org/apache/kafka/common/protocol/Errors.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- clients/src/main/java/org/apache/kafka/common/protocol/Errors.java (revision 1f4c0944eafc4947f6a5cc426c3e195f40810d6d) +++ clients/src/main/java/org/apache/kafka/common/protocol/Errors.java (revision dfb2d3156fcbf445d8615aa194e518e09150c42f) @@ -166,7 +166,9 @@ " the message was sent to an incompatible broker. See the broker logs for more details.")), UNSUPPORTED_FOR_MESSAGE_FORMAT(43, new UnsupportedForMessageFormatException("The message format version on the broker does not support the request.")), - POLICY_VIOLATION(44, new PolicyViolationException("Request parameters do not satisfy the configured policy.")); + POLICY_VIOLATION(44, new PolicyViolationException("Request parameters do not satisfy the configured policy.")), + OFFSET_LEO(45, null), + OFFSET_HW(46, null); private static final Logger log = LoggerFactory.getLogger(Errors.class); Index: clients/src/main/java/org/apache/kafka/common/record/FileRecords.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- clients/src/main/java/org/apache/kafka/common/record/FileRecords.java (revision 1f4c0944eafc4947f6a5cc426c3e195f40810d6d) +++ clients/src/main/java/org/apache/kafka/common/record/FileRecords.java (revision dfb2d3156fcbf445d8615aa194e518e09150c42f) @@ -79,6 +79,17 @@ shallowEntries = shallowEntriesFrom(start); } + public long creationTime() { + java.nio.file.attribute.BasicFileAttributes attributes = null; + try { + attributes = java.nio.file.Files.readAttributes(file.toPath(), + java.nio.file.attribute.BasicFileAttributes.class); + } catch (IOException exception) { + return -1; + } + return attributes.creationTime().to(java.util.concurrent.TimeUnit.MILLISECONDS); + } + @Override public int sizeInBytes() { return size.get(); Index: clients/src/main/java/org/apache/kafka/common/requests/GetStartOffsetRequest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- clients/src/main/java/org/apache/kafka/common/requests/GetStartOffsetRequest.java (revision dfb2d3156fcbf445d8615aa194e518e09150c42f) +++ clients/src/main/java/org/apache/kafka/common/requests/GetStartOffsetRequest.java (revision dfb2d3156fcbf445d8615aa194e518e09150c42f) @@ -0,0 +1,182 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.NewOffsetMetaData; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.utils.CollectionUtils; +import org.apache.kafka.common.utils.Utils; + +import java.nio.ByteBuffer; +import java.util.*; + +public class GetStartOffsetRequest extends AbstractRequest { + private static final String TOPIC_DATA_KEY_NAME = "topic_data"; + + // topic level field names + private static final String TOPIC_KEY_NAME = "topic"; + private static final String PARTITION_DATA_KEY_NAME = "data"; + + // partition level field names + private static final String PARTITION_KEY_NAME = "partition"; + + // NewOffsetMetaData + private static final String LEO_KEY_NAME = "leo"; + private static final String LSO_KEY_NAME = "lso"; + + private static final String BROKERID_KEY_NAME = "brokerid"; + private static final String LST_KEY_NAME = "lst"; + private static final String LET_KEY_NAME = "let"; + + + + private final LinkedHashMap partitionOffsetMetaDatas; + + + public static class Builder extends AbstractRequest.Builder { + private LinkedHashMap partitionOffsetMetaDatas; + + public Builder(LinkedHashMap partitionOffsetMetaDatas) { + super(ApiKeys.GET_START_OFFSET); + this.partitionOffsetMetaDatas = partitionOffsetMetaDatas; + } + + @Override + public GetStartOffsetRequest build() { + short version = version(); + return new GetStartOffsetRequest(this.partitionOffsetMetaDatas, version); + } + + @Override + public String toString() { + StringBuilder bld = new StringBuilder(); + bld.append("(type=GetStartOffsetRequest"). + append(", partitionOffsetMetaDatas="); + if (partitionOffsetMetaDatas == null) { + bld.append(""); + } else { + bld.append(", partitionRecords=(").append(Utils.mkString(partitionOffsetMetaDatas)); + } + bld.append(")"); + return bld.toString(); + } + } + + /** + * In v0 null is not allowed and and empty list indicates requesting all topics. + * Note: modern clients do not support sending v0 requests. + * In v1 null indicates requesting all topics, and an empty list indicates requesting no topics. + */ + + public GetStartOffsetRequest(Struct struct, short version) { + super(struct, version); + partitionOffsetMetaDatas = new LinkedHashMap<>(); + for (Object topicDataObj : struct.getArray(TOPIC_DATA_KEY_NAME)) { + Struct topicData = (Struct) topicDataObj; + String topic = topicData.getString(TOPIC_KEY_NAME); + for (Object partitionResponseObj : topicData.getArray(PARTITION_DATA_KEY_NAME)) { + Struct partitionResponse = (Struct) partitionResponseObj; + int partition = partitionResponse.getInt(PARTITION_KEY_NAME); + int brokerid = partitionResponse.getInt(BROKERID_KEY_NAME); + long leo = partitionResponse.getLong(LEO_KEY_NAME); + long lst = partitionResponse.getLong(LST_KEY_NAME); + long let = partitionResponse.getLong(LET_KEY_NAME); + long lso = partitionResponse.getLong(LSO_KEY_NAME); + NewOffsetMetaData newOffsetMetaData = new NewOffsetMetaData(brokerid, leo, lst, let, lso); + partitionOffsetMetaDatas.put(new TopicPartition(topic, partition), newOffsetMetaData); + } + } + } + + public GetStartOffsetRequest(LinkedHashMap partitionOffsetMetaDatas, short version) { + super(new Struct(ProtoUtils.requestSchema(ApiKeys.GET_START_OFFSET.id, version)), version); + List> topicsData = FetchRequest.TopicAndPartitionData.batchByTopic(partitionOffsetMetaDatas); + + List topicArray = new ArrayList<>(); + for (FetchRequest.TopicAndPartitionData topicEntry : topicsData) { + Struct topicData = struct.instance(TOPIC_DATA_KEY_NAME); + topicData.set(TOPIC_KEY_NAME, topicEntry.topic); + List partitionArray = new ArrayList<>(); + for (Map.Entry partitionEntry : topicEntry.partitions.entrySet()) { + NewOffsetMetaData newOffsetMetaData = partitionEntry.getValue(); + Struct partitionData = topicData.instance(PARTITION_DATA_KEY_NAME); + partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey()); + partitionData.set(LEO_KEY_NAME, newOffsetMetaData.leo); + partitionData.set(BROKERID_KEY_NAME, newOffsetMetaData.brokerid); + partitionData.set(LST_KEY_NAME, newOffsetMetaData.lst); + partitionData.set(LET_KEY_NAME, newOffsetMetaData.let); + partitionData.set(LSO_KEY_NAME, newOffsetMetaData.lso); + + partitionArray.add(partitionData); + } + topicData.set(PARTITION_DATA_KEY_NAME, partitionArray.toArray()); + topicArray.add(topicData); + } + struct.set(TOPIC_DATA_KEY_NAME, topicArray.toArray()); + + this.partitionOffsetMetaDatas = partitionOffsetMetaDatas; + } + + @Override + public String toString() { + // Use the same format as `Struct.toString()` + StringBuilder bld = new StringBuilder(); + bld.append(",partitionOffsetMetaDatas=") + .append(Utils.mkString(partitionOffsetMetaDatas, "[", "]", "=", ",")) + .append("}"); + return bld.toString(); + } + + @Override + public AbstractResponse getErrorResponse(Throwable e) { + LinkedHashMap responseData = new LinkedHashMap<>(); + + for (Map.Entry entry: partitionOffsetMetaDatas.entrySet()) { + GetStartOffsetResponse.StartOffsetResponse partitionResponse = new GetStartOffsetResponse.StartOffsetResponse(Errors.UNKNOWN, -1); + responseData.put(entry.getKey(), partitionResponse); + } + short versionId = version(); + return new GetStartOffsetResponse(responseData, versionId); + } + + public int getBrokerId() { + if (!this.partitionOffsetMetaDatas.isEmpty()) { + return this.partitionOffsetMetaDatas.entrySet().iterator().next().getValue().brokerid; + } else { + return -1; + } + } + + public Map partitionRecordsOrFail() { + // Store it in a local variable to protect against concurrent updates + Map partitionOffsetMetaDatas = this.partitionOffsetMetaDatas; + if (partitionOffsetMetaDatas == null) { + throw new IllegalStateException("The partition records are no longer available because"); + } + return partitionOffsetMetaDatas; + } + + public static GetStartOffsetRequest parse(ByteBuffer buffer, int versionId) { + return new GetStartOffsetRequest(ProtoUtils.parseRequest(ApiKeys.GET_START_OFFSET.id, versionId, buffer), (short) versionId); + } + + public static GetStartOffsetRequest parse(ByteBuffer buffer) { + return parse(buffer, ProtoUtils.latestVersion(ApiKeys.GET_START_OFFSET.id)); + } +} Index: clients/src/main/java/org/apache/kafka/common/requests/GetStartOffsetResponse.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- clients/src/main/java/org/apache/kafka/common/requests/GetStartOffsetResponse.java (revision dfb2d3156fcbf445d8615aa194e518e09150c42f) +++ clients/src/main/java/org/apache/kafka/common/requests/GetStartOffsetResponse.java (revision dfb2d3156fcbf445d8615aa194e518e09150c42f) @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.utils.CollectionUtils; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * This wrapper supports both v0 and v1 of ProduceResponse. + */ +public class GetStartOffsetResponse extends AbstractResponse { + + private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.GET_START_OFFSET.id); + private static final String RESPONSES_KEY_NAME = "responses"; + + // topic level field names + private static final String TOPIC_KEY_NAME = "topic"; + private static final String PARTITION_RESPONSES_KEY_NAME = "partition_responses"; + + // partition level field names + private static final String PARTITION_KEY_NAME = "partition"; + private static final String ERROR_CODE_KEY_NAME = "error_code"; + + public static final long INVALID_OFFSET = -1L; + + /** + * Possible error code: + * + * CORRUPT_MESSAGE (2) + * UNKNOWN_TOPIC_OR_PARTITION (3) + * NOT_LEADER_FOR_PARTITION (6) + * MESSAGE_TOO_LARGE (10) + * INVALID_TOPIC (17) + * RECORD_LIST_TOO_LARGE (18) + * NOT_ENOUGH_REPLICAS (19) + * NOT_ENOUGH_REPLICAS_AFTER_APPEND (20) + * INVALID_REQUIRED_ACKS (21) + * TOPIC_AUTHORIZATION_FAILED (29) + */ + + private static final String BASE_OFFSET_KEY_NAME = "base_offset"; + + private final Map responses; + + /** + * Constructor for Version 0 + * @param responses Produced data grouped by topic-partition + */ + public GetStartOffsetResponse(Map responses) { + super(new Struct(ProtoUtils.responseSchema(ApiKeys.GET_START_OFFSET.id, 0))); + initCommonFields(responses); + this.responses = responses; + } + + /** + * Constructor for a specific version + * @param responses Produced data grouped by topic-partition + * @param version the version of schema to use. + */ + public GetStartOffsetResponse(Map responses, int version) { + super(new Struct(ProtoUtils.responseSchema(ApiKeys.GET_START_OFFSET.id, version))); + initCommonFields(responses); + this.responses = responses; + } + + /** + * Constructor from a {@link Struct}. It is the caller's responsibility to pass in a struct with the latest schema. + * @param struct + */ + public GetStartOffsetResponse(Struct struct) { + super(struct); + responses = new HashMap<>(); + for (Object topicResponse : struct.getArray(RESPONSES_KEY_NAME)) { + Struct topicRespStruct = (Struct) topicResponse; + String topic = topicRespStruct.getString(TOPIC_KEY_NAME); + for (Object partResponse : topicRespStruct.getArray(PARTITION_RESPONSES_KEY_NAME)) { + Struct partRespStruct = (Struct) partResponse; + int partition = partRespStruct.getInt(PARTITION_KEY_NAME); + Errors error = Errors.forCode(partRespStruct.getShort(ERROR_CODE_KEY_NAME)); + long offset = partRespStruct.getLong(BASE_OFFSET_KEY_NAME); + TopicPartition tp = new TopicPartition(topic, partition); + + responses.put(tp, new StartOffsetResponse(error, offset)); + } + } + } + + private void initCommonFields(Map responses) { + Map> responseByTopic = CollectionUtils.groupDataByTopic(responses); + List topicDatas = new ArrayList<>(responseByTopic.size()); + for (Map.Entry> entry : responseByTopic.entrySet()) { + Struct topicData = struct.instance(RESPONSES_KEY_NAME); + topicData.set(TOPIC_KEY_NAME, entry.getKey()); + List partitionArray = new ArrayList<>(); + for (Map.Entry partitionEntry : entry.getValue().entrySet()) { + StartOffsetResponse part = partitionEntry.getValue(); + Struct partStruct = topicData.instance(PARTITION_RESPONSES_KEY_NAME) + .set(PARTITION_KEY_NAME, partitionEntry.getKey()) + .set(ERROR_CODE_KEY_NAME, part.error.code()) + .set(BASE_OFFSET_KEY_NAME, part.baseOffset); + partitionArray.add(partStruct); + } + topicData.set(PARTITION_RESPONSES_KEY_NAME, partitionArray.toArray()); + topicDatas.add(topicData); + } + struct.set(RESPONSES_KEY_NAME, topicDatas.toArray()); + } + + public Map responses() { + return this.responses; + } + + public static final class StartOffsetResponse { + public Errors error; + public long baseOffset; + + public StartOffsetResponse(Errors error) { + this(error, INVALID_OFFSET); + } + + public StartOffsetResponse(Errors error, long baseOffset) { + this.error = error; + this.baseOffset = baseOffset; + } + + @Override + public String toString() { + StringBuilder b = new StringBuilder(); + b.append('{'); + b.append("error: "); + b.append(error); + b.append(",offset: "); + b.append(baseOffset); + b.append('}'); + return b.toString(); + } + } + + public static GetStartOffsetResponse parse(ByteBuffer buffer) { + return new GetStartOffsetResponse(CURRENT_SCHEMA.read(buffer)); + } +} Index: core/src/main/scala/kafka/admin/AdminClient.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- core/src/main/scala/kafka/admin/AdminClient.scala (revision 1f4c0944eafc4947f6a5cc426c3e195f40810d6d) +++ core/src/main/scala/kafka/admin/AdminClient.scala (revision dfb2d3156fcbf445d8615aa194e518e09150c42f) @@ -13,6 +13,7 @@ package kafka.admin import java.nio.ByteBuffer +import java.util import java.util.{Collections, Properties} import java.util.concurrent.atomic.AtomicInteger @@ -30,7 +31,7 @@ import org.apache.kafka.common.requests._ import org.apache.kafka.common.requests.OffsetFetchResponse import org.apache.kafka.common.utils.{Time, Utils} -import org.apache.kafka.common.{Cluster, Node, TopicPartition} +import org.apache.kafka.common.{Cluster, NewOffsetMetaData, Node, TopicPartition} import scala.collection.JavaConverters._ import scala.util.Try @@ -132,6 +133,16 @@ response.responseData.asScala.map { case (tp, partitionData) => (tp, partitionData.offset) }.toMap } + + def getStartOffset(broker: Node, partitionOffsetMetaDatas: util.LinkedHashMap[TopicPartition, NewOffsetMetaData]): + Map[TopicPartition, GetStartOffsetResponse.StartOffsetResponse] = { + val response = send(broker, ApiKeys.GET_START_OFFSET, new GetStartOffsetRequest.Builder(partitionOffsetMetaDatas)).asInstanceOf[GetStartOffsetResponse] + debug("AdminClient getStartOffset=" + response.responses()) + response.responses().asScala.map(response => + (response._1 -> new GetStartOffsetResponse.StartOffsetResponse(response._2.error, response._2.baseOffset)) + ).toMap + } + def listAllBrokerVersionInfo(): Map[Node, Try[NodeApiVersions]] = findAllBrokers.map { broker => broker -> Try[NodeApiVersions](new NodeApiVersions(getApiVersions(broker).asJava)) Index: core/src/main/scala/kafka/cluster/Partition.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- core/src/main/scala/kafka/cluster/Partition.scala (revision 1f4c0944eafc4947f6a5cc426c3e195f40810d6d) +++ core/src/main/scala/kafka/cluster/Partition.scala (revision dfb2d3156fcbf445d8615aa194e518e09150c42f) @@ -33,7 +33,7 @@ import scala.collection.JavaConverters._ import com.yammer.metrics.core.Gauge -import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.{NewOffsetMetaData, TopicPartition} import org.apache.kafka.common.record.MemoryRecords import org.apache.kafka.common.requests.PartitionState import org.apache.kafka.common.utils.Time @@ -48,7 +48,7 @@ val topicPartition = new TopicPartition(topic, partitionId) private val localBrokerId = replicaManager.config.brokerId - private val logManager = replicaManager.logManager + val logManager = replicaManager.logManager private val zkUtils = replicaManager.zkUtils private val assignedReplicaMap = new Pool[Int, Replica] // The read lock is only required when multiple reads are executed and needs to be in a consistent manner @@ -232,13 +232,14 @@ /** * Update the log end offset of a certain replica of this partition */ - def updateReplicaLogReadResult(replicaId: Int, logReadResult: LogReadResult) { + def updateReplicaLogReadResult(replicaId: Int, logReadResult: LogReadResult, + leaderOffsetMetaData: NewOffsetMetaData, followerOffsetMetaData: NewOffsetMetaData) { getReplica(replicaId) match { case Some(replica) => replica.updateLogReadResult(logReadResult) // check if we need to expand ISR to include this replica // if it is not in the ISR yet - maybeExpandIsr(replicaId, logReadResult) + maybeExpandIsr(replicaId, logReadResult, leaderOffsetMetaData, followerOffsetMetaData) debug("Recorded replica %d log end offset (LEO) position %d for partition %s." .format(replicaId, logReadResult.info.fetchOffsetMetadata.messageOffset, topicPartition)) @@ -253,6 +254,14 @@ } } + private def validNewOffsetMetaData(offsetMetaData: NewOffsetMetaData): Boolean = { + if (offsetMetaData.let != -1 && offsetMetaData.lst != -1) { + return true + } else { + return false + } + } + /** * Check and maybe expand the ISR of the partition. * A replica will be added to ISR if its LEO >= current hw of the partition. @@ -263,22 +272,43 @@ * * This function can be triggered when a replica's LEO has incremented */ - def maybeExpandIsr(replicaId: Int, logReadResult: LogReadResult) { + def maybeExpandIsr(replicaId: Int, logReadResult: LogReadResult, + leaderOffsetMetaData: NewOffsetMetaData, followerOffsetMetaData: NewOffsetMetaData) { + val currentTime: Long = time.milliseconds val leaderHWIncremented = inWriteLock(leaderIsrUpdateLock) { // check if this replica needs to be added to the ISR leaderReplicaIfLocal match { case Some(leaderReplica) => val replica = getReplica(replicaId).get val leaderHW = leaderReplica.highWatermark - if(!inSyncReplicas.contains(replica) && - assignedReplicas.map(_.brokerId).contains(replicaId) && - replica.logEndOffset.offsetDiff(leaderHW) >= 0) { - val newInSyncReplicas = inSyncReplicas + replica - info(s"Expanding ISR for partition $topicPartition from ${inSyncReplicas.map(_.brokerId).mkString(",")} " + - s"to ${newInSyncReplicas.map(_.brokerId).mkString(",")}") - // update ISR in ZK and cache - updateIsr(newInSyncReplicas) - replicaManager.isrExpandRate.mark() + + if (replicaManager.config.smartExtendEnable) { + if (!inSyncReplicas.contains(replica) && + assignedReplicas.map(_.brokerId).contains(replicaId) && + replica.logEndOffset.offsetDiff(leaderHW) >= 0 && + validNewOffsetMetaData(followerOffsetMetaData) && + ((currentTime - followerOffsetMetaData.lst) > replicaManager.config.replicaIsrLstEntryTimeMaxMs || + Math.abs(leaderOffsetMetaData.lso - followerOffsetMetaData.lso) < replicaManager.config.ReplicaIsrEntryISRMaxLag)) { + + val newInSyncReplicas = inSyncReplicas + replica + info(s"Expanding ISR for partition $topicPartition from ${inSyncReplicas.map(_.brokerId).mkString(",")} " + + s"to ${newInSyncReplicas.map(_.brokerId).mkString(",")} + leader=" + leaderOffsetMetaData + " follower=" + + followerOffsetMetaData + " currentTime=" + currentTime + " smartExtendEnable=" + replicaManager.config.smartExtendEnable) + // update ISR in ZK and cache + updateIsr(newInSyncReplicas) + replicaManager.isrExpandRate.mark() + } + } else { + if (!inSyncReplicas.contains(replica) && + assignedReplicas.map(_.brokerId).contains(replicaId) && + replica.logEndOffset.offsetDiff(leaderHW) >= 0) { + val newInSyncReplicas = inSyncReplicas + replica + info(s"Expanding ISR for partition $topicPartition from ${inSyncReplicas.map(_.brokerId).mkString(",")} " + + s"to ${newInSyncReplicas.map(_.brokerId).mkString(",")}") + // update ISR in ZK and cache + updateIsr(newInSyncReplicas) + replicaManager.isrExpandRate.mark() + } } // check if the HW of the partition can now be incremented Index: core/src/main/scala/kafka/cluster/Replica.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- core/src/main/scala/kafka/cluster/Replica.scala (revision 1f4c0944eafc4947f6a5cc426c3e195f40810d6d) +++ core/src/main/scala/kafka/cluster/Replica.scala (revision dfb2d3156fcbf445d8615aa194e518e09150c42f) @@ -92,6 +92,12 @@ } } + def logStartOffset = + if (isLocal) + log.get.logStartOffset + else + throw new KafkaException(s"Should logStartOffset $topicPartition's local replica $brokerId") + def logEndOffset = if (isLocal) log.get.logEndOffsetMetadata Index: core/src/main/scala/kafka/log/Log.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- core/src/main/scala/kafka/log/Log.scala (revision 1f4c0944eafc4947f6a5cc426c3e195f40810d6d) +++ core/src/main/scala/kafka/log/Log.scala (revision dfb2d3156fcbf445d8615aa194e518e09150c42f) @@ -110,7 +110,7 @@ @volatile private var nextOffsetMetadata: LogOffsetMetadata = _ /* the actual segments of the log */ - private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment] + val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment] locally { val startMs = time.milliseconds Index: core/src/main/scala/kafka/server/KafkaApis.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- core/src/main/scala/kafka/server/KafkaApis.scala (revision 1f4c0944eafc4947f6a5cc426c3e195f40810d6d) +++ core/src/main/scala/kafka/server/KafkaApis.scala (revision dfb2d3156fcbf445d8615aa194e518e09150c42f) @@ -21,10 +21,11 @@ import java.lang.{Long => JLong, Short => JShort} import java.util.{Collections, Properties} import java.util +import java.util.concurrent.TimeUnit import kafka.admin.{AdminUtils, RackAwareMode} import kafka.api.{ControlledShutdownRequest, ControlledShutdownResponse} -import kafka.cluster.Partition +import kafka.cluster.{BrokerEndPoint, Partition} import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota} import kafka.common._ import kafka.controller.KafkaController @@ -34,17 +35,17 @@ import kafka.network.RequestChannel.{Response, Session} import kafka.security.auth import kafka.security.auth.{Authorizer, ClusterAction, Create, Delete, Describe, Group, Operation, Read, Resource, Write} -import kafka.utils.{Logging, ZKGroupTopicDirs, ZkUtils} +import kafka.utils.{Logging, Scheduler, ZKGroupTopicDirs, ZkUtils} import org.apache.kafka.common.errors.{ClusterAuthorizationException, NotLeaderForPartitionException, TopicExistsException, UnknownTopicOrPartitionException, UnsupportedForMessageFormatException} import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.{ApiKeys, Errors, Protocol} import org.apache.kafka.common.record.{MemoryRecords, Record} -import org.apache.kafka.common.requests._ +import org.apache.kafka.common.requests.GetStartOffsetResponse.StartOffsetResponse +import org.apache.kafka.common.requests.{GetStartOffsetRequest, GetStartOffsetResponse, SaslHandshakeResponse, _} import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.utils.{Time, Utils} -import org.apache.kafka.common.{Node, TopicPartition} -import org.apache.kafka.common.requests.SaslHandshakeResponse +import org.apache.kafka.common.{NewOffsetMetaData, Node, TopicPartition} import scala.collection._ import scala.collection.JavaConverters._ @@ -65,9 +66,77 @@ val authorizer: Option[Authorizer], val quotas: QuotaManagers, val clusterId: String, - time: Time) extends Logging { + time: Time, + scheduler: Scheduler) extends Logging { this.logIdent = "[KafkaApi-%d] ".format(brokerId) + scheduler.schedule("send-offset-metadata", sendNewOffsetMetaData, period = config.sentOffsetMetaDataIntervalMs, unit = TimeUnit.MILLISECONDS) + scheduler.schedule("update-leader-offset", updateLeaderOffsetMetaData, period = config.updateLeaderOffsetIntervalMs, unit = TimeUnit.MILLISECONDS) + + private def sendNewOffsetMetaData(): Unit = { + debug("sendNewOffsetMetaData config.smartExtendEnable=" + config.smartExtendEnable) + if (config.smartExtendEnable) { + var smartExtendManager: SmartExtendManager = null + try { + val partitionsToMakeFollower: Set[Partition] = replicaManager.getFollowerPartitions() + + smartExtendManager = new SmartExtendManager(config) + val brokerPartitionMap: mutable.Map[BrokerEndPoint, mutable.Set[Partition]] = mutable.Map.empty + partitionsToMakeFollower.map { partition => + val remoteEndPoint: BrokerEndPoint = metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get.getBrokerEndPoint(config.interBrokerListenerName) + if (brokerPartitionMap.get(remoteEndPoint).isEmpty) { + val partitionSet: mutable.Set[Partition] = mutable.Set.empty + partitionSet.add(partition) + brokerPartitionMap.put(remoteEndPoint, partitionSet) + } else { + val partitionSet: mutable.Set[Partition] = brokerPartitionMap.get(remoteEndPoint).get + partitionSet.add(partition) + brokerPartitionMap.put(remoteEndPoint, partitionSet) + } + } + + val ResponseOffsetMap: mutable.Map[TopicPartition, GetStartOffsetResponse.StartOffsetResponse] = mutable.Map.empty + if (brokerPartitionMap.nonEmpty) { + brokerPartitionMap.map { brokerInfo => + val ResponseOffsetBroker = smartExtendManager.sendRequest(brokerInfo._1, brokerInfo._2).asScala + ResponseOffsetBroker.map { offsetInfo => + ResponseOffsetMap.put(offsetInfo._1, offsetInfo._2) + } + } + debug("sendNewOffsetMetaData ResponseOffsetMap=" + ResponseOffsetMap) + } else { + debug("sendNewOffsetMetaData brokerPartitionMap is empty") + } + + } catch { + case e: Exception => error("sendNewOffsetMetaData Exception, sending message to broker. " + e.getMessage) + } finally { + if (smartExtendManager != null) { + debug("sendNewOffsetMetaData brokerPartitionMap close...") + smartExtendManager.close + } + } + } + } + + def updateLeaderOffsetMetaData(): Unit = { + val partitionsToMakeLeaders: Set[Partition] = replicaManager.getLeaderPartitions().toSet + + val metaData = new mutable.HashMap[TopicPartition, NewOffsetMetaData] + partitionsToMakeLeaders.map { partition => + val lso: Long = partition.leaderReplicaIfLocal.get.logStartOffset + val leo: Long = partition.leaderReplicaIfLocal.get.logEndOffset.messageOffset + val lst: Long = partition.logManager.getLog(partition.topicPartition).get.segments.firstEntry().getValue.log.creationTime() + val let: Long = partition.logManager.getLog(partition.topicPartition).get.segments.lastEntry().getValue.log.file.lastModified() + metaData.put(partition.topicPartition, new NewOffsetMetaData(partition.leaderReplicaIdOpt.get, leo, lst, let, lso)) + } + if (metaData.nonEmpty) { + debug("updateLeaderOffsetMetaData updateNewOffsetMetaData broker=" + brokerId + " metaData=" + metaData) + replicaManager.updateNewOffsetMetaData(brokerId, metaData) + } else { + warn("updateLeaderOffsetMetaData updateNewOffsetMetaData broker=" + brokerId + " metaData is empty.") + } + } /** * Top-level method that handles all requests and multiplexes to the right api @@ -98,6 +167,7 @@ case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request) case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request) case ApiKeys.DELETE_TOPICS => handleDeleteTopicsRequest(request) + case ApiKeys.GET_START_OFFSET => handleGetStartOffsetRequest(request) case requestId => throw new KafkaException("Unknown api code " + requestId) } } catch { @@ -1260,6 +1330,82 @@ } } + private def expriedOffset(topicPartition: TopicPartition, timestamp: Long, followerLeo: Long): Boolean = { + try { + val timestampOffset = fetchOffsetForTimestamp(replicaManager.logManager, topicPartition, timestamp) + if (followerLeo < timestampOffset.get.offset) { + warn(topicPartition + " offset expried, follower need trunc log, follower offset=" + followerLeo + " timestampOffset=" + timestampOffset.get + " timestamp=" + timestamp) + true + } else { + false + } + } catch { + case e: Throwable => + error("expriedOffset error, due to", e) + false + } + } + + def handleGetStartOffsetRequest(request: RequestChannel.Request) { + val startOffsetRequest = request.body.asInstanceOf[GetStartOffsetRequest] + + // 1. update follower status + debug("handleGetStartOffsetRequest updateNewOffsetMetaData from broker=" + startOffsetRequest.getBrokerId() + + " metadata=" + startOffsetRequest.partitionRecordsOrFail()) + val partitionRecordsMap = new mutable.HashMap[TopicPartition, NewOffsetMetaData] + startOffsetRequest.partitionRecordsOrFail.asScala.map { partition => + partitionRecordsMap.put(partition._1, partition._2) + + } + replicaManager.updateNewOffsetMetaData(startOffsetRequest.getBrokerId(), partitionRecordsMap) + + // 2. check leader offsetMap is init? + val metaData: mutable.HashMap[TopicPartition, NewOffsetMetaData] = new mutable.HashMap[TopicPartition, NewOffsetMetaData]() + startOffsetRequest.partitionRecordsOrFail.asScala.map { case (topicPartition, offsetMetaData) => + if (replicaManager.newOffsetMetaDataContains(brokerId)) { + if (!replicaManager.getNewOffsetMetaData(brokerId).contains(topicPartition)) { + val leo: Long = replicaManager.getPartition(topicPartition).get.leaderReplicaIfLocal.get.logEndOffset.messageOffset + val lso: Long = replicaManager.getPartition(topicPartition).get.leaderReplicaIfLocal.get.logStartOffset + val lst: Long = replicaManager.getPartition(topicPartition).get.logManager.getLog(topicPartition).get.segments.firstEntry().getValue.log.creationTime() + val let: Long = replicaManager.getPartition(topicPartition).get.logManager.getLog(topicPartition).get.segments.lastEntry().getValue.log.file.lastModified() + metaData.put(topicPartition, new NewOffsetMetaData(replicaManager.getPartition(topicPartition).get.leaderReplicaIdOpt.get, leo, lst, let, lso)) + } + } else { + debug("handleGetStartOffsetRequest initialize leader broker=" + brokerId) + val leo: Long = replicaManager.getPartition(topicPartition).get.leaderReplicaIfLocal.get.logEndOffset.messageOffset + val lso: Long = replicaManager.getPartition(topicPartition).get.leaderReplicaIfLocal.get.logStartOffset + val lst: Long = replicaManager.getPartition(topicPartition).get.logManager.getLog(topicPartition).get.segments.firstEntry().getValue.log.creationTime() + val let: Long = replicaManager.getPartition(topicPartition).get.logManager.getLog(topicPartition).get.segments.lastEntry().getValue.log.file.lastModified() + metaData.put(topicPartition, new NewOffsetMetaData(replicaManager.getPartition(topicPartition).get.leaderReplicaIdOpt.get, leo, lst, let, lso)) + } + } + if (metaData.nonEmpty) { + warn("handleGetStartOffsetRequest leader map uninitialize,updateNewOffsetMetaData broker=" + brokerId + " metaData=" + metaData) + replicaManager.updateNewOffsetMetaData(brokerId, metaData) + } + + // 3. return 'offset' respone for follower + val responses: Map[TopicPartition, StartOffsetResponse] = + startOffsetRequest.partitionRecordsOrFail.asScala.map { case (topicPartition, offsetMetaData) => + var baseOffset: Long = 0 + var errorCode: Errors = Errors.NONE + val timestamp = Time.SYSTEM.milliseconds() - config.replicaExpriedMaxMs + if ((replicaManager.getNewOffsetMetaData(brokerId).get(topicPartition).get.leo - offsetMetaData.leo) > config.replicaLstTimeMaxMs + && expriedOffset(topicPartition, timestamp, offsetMetaData.leo)) { + baseOffset = replicaManager.getPartition(topicPartition).get.leaderReplicaIfLocal.get.highWatermark.messageOffset + errorCode = Errors.OFFSET_HW + } else { + baseOffset = offsetMetaData.leo + errorCode = Errors.OFFSET_LEO + } + (new TopicPartition(topicPartition.topic(), topicPartition.partition()), new StartOffsetResponse(errorCode, baseOffset)) + }.toMap + + debug("handleGetStartOffsetRequest return responses=" + responses) + val responseBody = new GetStartOffsetResponse(responses.asJava) + requestChannel.sendResponse(new RequestChannel.Response(request, responseBody)) + } + def authorizeClusterAction(request: RequestChannel.Request): Unit = { if (!authorize(request.session, ClusterAction, Resource.ClusterResource)) throw new ClusterAuthorizationException(s"Request $request is not authorized.") Index: core/src/main/scala/kafka/server/KafkaConfig.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- core/src/main/scala/kafka/server/KafkaConfig.scala (revision 1f4c0944eafc4947f6a5cc426c3e195f40810d6d) +++ core/src/main/scala/kafka/server/KafkaConfig.scala (revision dfb2d3156fcbf445d8615aa194e518e09150c42f) @@ -24,10 +24,11 @@ import kafka.consumer.ConsumerConfig import kafka.coordinator.OffsetConfig import kafka.message.{BrokerCompressionCodec, CompressionCodec, Message, MessageSet} +import kafka.server.KafkaConfig.{ReplicaIsrEntryISRMaxLagDoc, ReplicaIsrLstEntryTimeMaxMsDoc, _} import kafka.utils.CoreUtils import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.common.config.ConfigDef.ValidList -import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, SaslConfigs, SslConfigs} +import org.apache.kafka.common.config._ import org.apache.kafka.common.metrics.{MetricsReporter, Sensor} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.SecurityProtocol @@ -38,6 +39,11 @@ import scala.collection.JavaConverters._ object Defaults { + /** ********* SmartExtendManager Configuration ***********/ + val sentOffsetMetaDataIntervalMs = 30 * 1000L + val getStartOffsetMaxRetries = 10 + val updateLeaderOffsetIntervalMs = 30 * 1000L + /** ********* Zookeeper Configuration ***********/ val ZkSessionTimeoutMs = 6000 val ZkSyncTimeMs = 2000 @@ -203,6 +209,17 @@ def main(args: Array[String]) { System.out.println(configDef.toHtmlTable) } + /** ********* SmartExtendManager Configuration ***********/ + val ReplicaExpriedMaxMsProp = "replica.trunclog.expried.ms" + val ReplicaLstTimeMaxMsProp = "replica.trunclog.lag" + val sentOffsetMetaDataIntervalMsProp = "sent.offset.metadata.check.interval.ms" + val SmartExtendEnableProp = "smart.extend.enable" + val BootstarpServersProp = "bootstrap.servers" + val getStartOffsetretriesProp = "getstartoffset.max.retries.num" + val updateLeaderOffsetIntervalMsProp = "leader.update.offset.check.interval.ms" + val ReplicaIsrLstEntryTimeMaxMsProp = "replica.entry.isr.lst.lag.ms" + val ReplicaIsrEntryISRMaxLagProp= "replica.entry.isr.lso.lag" + /** ********* Zookeeper Configuration ***********/ val ZkConnectProp = "zookeeper.connect" @@ -369,6 +386,16 @@ val SaslKerberosPrincipalToLocalRulesProp = SaslConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES /* Documentation */ + /** ********* SmartExtendManager Configuration ***********/ + val ReplicaLstTimeMaxMsPropDoc = "return HW or LEO?" + val sentOffsetMetaDataIntervalMsDoc = "sent offset metadata interval" + val SmartExtendEnableDoc = "enable SmartExtend RPC?" + val BootstarpServersDoc = "Bootstarp Server string for smart extend cluster" + val getStartOffsetretriesDoc = "get start offset max retries" + val updateLeaderOffsetIntervalMsDoc = "leader update offset interval" + val ReplicaIsrLstEntryTimeMaxMsDoc = "Is entry ISR(lst)?" + val ReplicaIsrEntryISRMaxLagDoc = "Is entry ISR(lso)" + /** ********* Zookeeper Configuration ***********/ val ZkConnectDoc = "Zookeeper host string" val ZkSessionTimeoutMsDoc = "Zookeeper session timeout" @@ -610,6 +637,17 @@ import ConfigDef.ValidString._ new ConfigDef() + /** ********* SmartExtendManager Configuration ***********/ + .define(ReplicaExpriedMaxMsProp, LONG, HIGH, ReplicaLstTimeMaxMsPropDoc) + .define(ReplicaLstTimeMaxMsProp, LONG, HIGH, ReplicaLstTimeMaxMsPropDoc) + .define(sentOffsetMetaDataIntervalMsProp, LONG, Defaults.sentOffsetMetaDataIntervalMs, HIGH, sentOffsetMetaDataIntervalMsDoc) + .define(SmartExtendEnableProp, BOOLEAN, HIGH, SmartExtendEnableDoc) + .define(BootstarpServersProp, STRING, HIGH, BootstarpServersDoc) + .define(getStartOffsetretriesProp, INT, Defaults.getStartOffsetMaxRetries, HIGH, getStartOffsetretriesDoc) + .define(updateLeaderOffsetIntervalMsProp, LONG, Defaults.updateLeaderOffsetIntervalMs, HIGH, updateLeaderOffsetIntervalMsDoc) + .define(ReplicaIsrLstEntryTimeMaxMsProp, LONG, HIGH, ReplicaIsrLstEntryTimeMaxMsDoc) + .define(ReplicaIsrEntryISRMaxLagProp, LONG, HIGH, ReplicaIsrEntryISRMaxLagDoc) + /** ********* Zookeeper Configuration ***********/ .define(ZkConnectProp, STRING, HIGH, ZkConnectDoc) @@ -812,6 +850,18 @@ def this(props: java.util.Map[_, _]) = this(props, true) + /** ********* SmartExtendManager Configuration ***********/ + val replicaExpriedMaxMs: Long = getLong(KafkaConfig.ReplicaExpriedMaxMsProp) + val replicaLstTimeMaxMs: Long = getLong(KafkaConfig.ReplicaLstTimeMaxMsProp) + val sentOffsetMetaDataIntervalMs: Long = getLong(KafkaConfig.sentOffsetMetaDataIntervalMsProp) + val smartExtendEnable: Boolean = getBoolean(KafkaConfig.SmartExtendEnableProp) + val bootstarpServers : String = getString(KafkaConfig.BootstarpServersProp) + val getStartOffsetRetries: Int = getInt(KafkaConfig.getStartOffsetretriesProp) + val updateLeaderOffsetIntervalMs: Long = getLong(KafkaConfig.updateLeaderOffsetIntervalMsProp) + val replicaIsrLstEntryTimeMaxMs: Long = getLong(KafkaConfig.ReplicaIsrLstEntryTimeMaxMsProp) + val ReplicaIsrEntryISRMaxLag: Long = getLong(KafkaConfig.ReplicaIsrEntryISRMaxLagProp) + + /** ********* Zookeeper Configuration ***********/ val zkConnect: String = getString(KafkaConfig.ZkConnectProp) val zkSessionTimeoutMs: Int = getInt(KafkaConfig.ZkSessionTimeoutMsProp) Index: core/src/main/scala/kafka/server/KafkaServer.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- core/src/main/scala/kafka/server/KafkaServer.scala (revision 1f4c0944eafc4947f6a5cc426c3e195f40810d6d) +++ core/src/main/scala/kafka/server/KafkaServer.scala (revision dfb2d3156fcbf445d8615aa194e518e09150c42f) @@ -240,7 +240,7 @@ /* start processing requests */ apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager, groupCoordinator, kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers, - clusterId, time) + clusterId, time, kafkaScheduler) requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time, config.numIoThreads) Index: core/src/main/scala/kafka/server/ReplicaManager.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- core/src/main/scala/kafka/server/ReplicaManager.scala (revision 1f4c0944eafc4947f6a5cc426c3e195f40810d6d) +++ core/src/main/scala/kafka/server/ReplicaManager.scala (revision dfb2d3156fcbf445d8615aa194e518e09150c42f) @@ -22,19 +22,19 @@ import com.yammer.metrics.core.Gauge import kafka.api._ -import kafka.cluster.{Partition, Replica} +import kafka.cluster.{BrokerEndPoint, Partition, Replica} import kafka.common._ import kafka.controller.KafkaController import kafka.log.{Log, LogAppendInfo, LogManager} import kafka.metrics.KafkaMetricsGroup import kafka.server.QuotaFactory.UnboundedQuota import kafka.utils._ -import org.apache.kafka.common.errors.{ControllerMovedException, CorruptRecordException, InvalidTimestampException, InvalidTopicException, NotLeaderForPartitionException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException, ReplicaNotAvailableException, UnknownTopicOrPartitionException} -import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.{ControllerMovedException => _, NotLeaderForPartitionException => _, OffsetOutOfRangeException => _, ReplicaNotAvailableException => _, UnknownTopicOrPartitionException => _, _} +import org.apache.kafka.common.{NewOffsetMetaData, TopicPartition} import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record._ -import org.apache.kafka.common.requests.{LeaderAndIsrRequest, PartitionState, StopReplicaRequest, UpdateMetadataRequest} +import org.apache.kafka.common.requests._ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.utils.Time import org.apache.kafka.common.requests.FetchRequest.PartitionData @@ -125,6 +125,9 @@ private val isrChangeSet: mutable.Set[TopicPartition] = new mutable.HashSet[TopicPartition]() private val lastIsrChangeMs = new AtomicLong(System.currentTimeMillis()) private val lastIsrPropagationMs = new AtomicLong(System.currentTimeMillis()) + var newOffsetMetaDataMap : mutable.Map[Int, mutable.Map[TopicPartition, NewOffsetMetaData]] = new mutable.HashMap[Int, mutable.Map[TopicPartition, NewOffsetMetaData]]() + + private val offsetMapLock = new Object val delayedProducePurgatory = DelayedOperationPurgatory[DelayedProduce]( purgatoryName = "Produce", localBrokerId, config.producerPurgatoryPurgeIntervalRequests) @@ -154,6 +157,61 @@ val isrExpandRate = newMeter("IsrExpandsPerSec", "expands", TimeUnit.SECONDS) val isrShrinkRate = newMeter("IsrShrinksPerSec", "shrinks", TimeUnit.SECONDS) + def updateNewOffsetMetaData(brokerId: Int, metaData: mutable.HashMap[TopicPartition, NewOffsetMetaData]): Unit = { + offsetMapLock synchronized { + if (newOffsetMetaDataMap.contains(brokerId)) { + metaData.map { metaInfo => + newOffsetMetaDataMap(brokerId).put(metaInfo._1, metaInfo._2) + } + } else { + newOffsetMetaDataMap.put(brokerId, metaData) + } + } + } + + def clearNewOffsetMetaData(partions: mutable.Set[Partition]): Unit = { + info("clearNewOffsetMetaData partions=" + partions) + offsetMapLock synchronized { + partions.map { partition => + newOffsetMetaDataMap.map { brokerInfo => + brokerInfo._2.map { metaData => + if (metaData._1.equals(partition.topicPartition)) { + newOffsetMetaDataMap(brokerInfo._1).put(metaData._1, new NewOffsetMetaData(-2, -1, -1, -1, -1)) + } + } + } + } + } + } + + def getOffsetMetaDataForPartition(brokerId: Int, topicPartition: TopicPartition): NewOffsetMetaData = { + offsetMapLock synchronized { + if (newOffsetMetaDataMap.nonEmpty && newOffsetMetaDataMap.contains(brokerId)) { + if (newOffsetMetaDataMap(brokerId).contains(topicPartition)) { + return newOffsetMetaDataMap(brokerId).get(topicPartition).get + } else { + return new NewOffsetMetaData(brokerId, -1, -1, -1, -1) + } + } else { + return new NewOffsetMetaData(brokerId, -1, -1, -1, -1) + } + } + } + + def getNewOffsetMetaData(brokerId: Int): mutable.Map[TopicPartition, NewOffsetMetaData] = { + return offsetMapLock synchronized { newOffsetMetaDataMap(brokerId) } + } + + def newOffsetMetaDataContains(brokerId: Int): Boolean = { + return offsetMapLock synchronized { + if (newOffsetMetaDataMap.contains(brokerId)) { + return true + } else { + return false + } + } + } + def underReplicatedPartitionCount(): Int = { getLeaderPartitions().count(_.isUnderReplicated) } @@ -761,6 +819,19 @@ "%d epoch %d with correlation id %d for partition %s") .format(localBrokerId, controllerId, epoch, correlationId, partition.topicPartition)) } + + // Init partition newOffsetMetaDataMap for Leader + val metaData = new mutable.HashMap[TopicPartition, NewOffsetMetaData] + partitionsToMakeLeaders.map{ partition => + val lso: Long = partition.leaderReplicaIfLocal.get.logStartOffset + val leo: Long = partition.leaderReplicaIfLocal.get.logEndOffset.messageOffset + val lst: Long = partition.logManager.getLog(partition.topicPartition).get.segments.firstEntry().getValue.log.creationTime() + val let: Long = partition.logManager.getLog(partition.topicPartition).get.segments.lastEntry().getValue.log.file.lastModified() + metaData.put(partition.topicPartition, new NewOffsetMetaData(partition.leaderReplicaIdOpt.get, leo, lst, let, lso)) + } + + info("makeLeaders updateNewOffsetMetaData broker=" + localBrokerId + " metaData=" + metaData) + updateNewOffsetMetaData(localBrokerId, metaData) } catch { case e: Throwable => partitionState.keys.foreach { partition => @@ -854,6 +925,7 @@ logManager.truncateTo(partitionsToMakeFollower.map { partition => (partition.topicPartition, partition.getOrCreateReplica().highWatermark.messageOffset) }.toMap) + partitionsToMakeFollower.foreach { partition => val topicPartitionOperationKey = new TopicPartitionOperationKey(partition.topicPartition) tryCompleteDelayedProduce(topicPartitionOperationKey) @@ -874,12 +946,63 @@ } } else { - // we do not need to check if the leader exists again since this has been done at the beginning of this process - val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition => - partition.topicPartition -> BrokerAndInitialOffset( - metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get.getBrokerEndPoint(config.interBrokerListenerName), - partition.getReplica().get.logEndOffset.messageOffset)).toMap - replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset) + debug("makefollowers config.smartExtendEnable=" + config.smartExtendEnable) + if (config.smartExtendEnable) { + try { + // 1. clear NewOffsetMetaData. + clearNewOffsetMetaData(partitionsToMakeFollower) + + // 2. get offset from leader. + val ResponseOffsetMap = getBestOffset(partitionsToMakeFollower, metadataCache, config.getStartOffsetRetries) + + if (ResponseOffsetMap.nonEmpty) { + val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition => + partition.topicPartition -> BrokerAndInitialOffset( + metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get.getBrokerEndPoint(config.interBrokerListenerName), + ResponseOffsetMap.get(partition.topicPartition).get.baseOffset)).toMap + + // 3. trunc log + val partitionOffsets: mutable.Map[TopicPartition, Long] = mutable.Map[TopicPartition, Long]() + ResponseOffsetMap.map { partition => + if (partition._2.error == Errors.OFFSET_HW) { + partitionOffsets.put(partition._1, partition._2.baseOffset) + } + } + + if (partitionOffsets.nonEmpty) { + info("makefollowers trunc log, partitionOffsets size=" + partitionOffsets.size + " ResponseOffsetMap size=" + + ResponseOffsetMap.size + " partitionOffsets=" + partitionOffsets + " ResponseOffsetMap=" + ResponseOffsetMap) + partitionOffsets.map { partitionInfo => + logManager.truncateFullyAndStartAt(partitionInfo._1, partitionInfo._2) + } + } + replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset) + } else { + val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition => + partition.topicPartition -> BrokerAndInitialOffset( + metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get.getBrokerEndPoint(config.interBrokerListenerName), + partition.getReplica().get.logEndOffset.messageOffset)).toMap + error("makefollowers getStartOffset fail, and use old mode partitionsToMakeFollowerWithLeaderAndOffset=" + partitionsToMakeFollowerWithLeaderAndOffset) + replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset) + } + } catch { + case e: Exception => + val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition => + partition.topicPartition -> BrokerAndInitialOffset( + metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get.getBrokerEndPoint(config.interBrokerListenerName), + partition.getReplica().get.logEndOffset.messageOffset)).toMap + error("ReplicaManager makefollowers getStartOffset fail, and use old mode partitionsToMakeFollowerWithLeaderAndOffset=" + partitionsToMakeFollowerWithLeaderAndOffset + + " Exception=" + e.getMessage) + replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset) + } + } else { + // we do not need to check if the leader exists again since this has been done at the beginning of this process + val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition => + partition.topicPartition -> BrokerAndInitialOffset( + metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get.getBrokerEndPoint(config.interBrokerListenerName), + partition.getReplica().get.logEndOffset.messageOffset)).toMap + replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset) + } partitionsToMakeFollower.foreach { partition => stateChangeLogger.trace(("Broker %d started fetcher to new leader as part of become-follower request from controller " + @@ -905,6 +1028,61 @@ partitionsToMakeFollower } + private def getBestOffset(partitionsToMakeFollower: mutable.Set[Partition], metadataCache: MetadataCache, retries: Int): + mutable.Map[TopicPartition, GetStartOffsetResponse.StartOffsetResponse] = { + var successGetBestOffset: Boolean = false + var remainingRetries = retries + var smartExtendManager: SmartExtendManager = null + var ResponseOffsetMap: mutable.Map[TopicPartition, GetStartOffsetResponse.StartOffsetResponse] = mutable.Map.empty + while (!successGetBestOffset && remainingRetries > 0) { + remainingRetries = remainingRetries - 1 + try { + info("start getBestOffset=" + partitionsToMakeFollower + " metadataCache=" + metadataCache) + smartExtendManager = new SmartExtendManager(config) + val brokerPartitionMap: mutable.Map[BrokerEndPoint, mutable.Set[Partition]] = mutable.Map.empty + partitionsToMakeFollower.map { partition => + val remoteEndPoint: BrokerEndPoint = metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get.getBrokerEndPoint(config.interBrokerListenerName) + if (brokerPartitionMap.get(remoteEndPoint).isEmpty) { + val partitionSet: mutable.Set[Partition] = mutable.Set.empty + partitionSet.add(partition) + brokerPartitionMap.put(remoteEndPoint, partitionSet) + } else { + val partitionSet: mutable.Set[Partition] = brokerPartitionMap.get(remoteEndPoint).get + partitionSet.add(partition) + brokerPartitionMap.put(remoteEndPoint, partitionSet) + } + } + + brokerPartitionMap.map { brokerInfo => + val ResponseOffsetBroker = smartExtendManager.sendRequest(brokerInfo._1, brokerInfo._2).asScala + ResponseOffsetBroker.map { offsetInfo => + if (offsetInfo._2.error == Errors.UNKNOWN && offsetInfo._2.baseOffset == -1) { + throw new KafkaException("offsetInfo._2.error=Errors.UNKNOWN and offsetInfo._2.baseOffset=-1 broker=" + + brokerInfo) + } + ResponseOffsetMap.put(offsetInfo._1, offsetInfo._2) + } + } + info("finish getBestOffset ResponseOffsetMap=" + ResponseOffsetMap) + successGetBestOffset = true + } catch { + case e: Exception => + error("ReplicaManager getBestOffset Exception=" + e.getMessage + " in " + remainingRetries + "/" + retries) + Thread.sleep(100) + ResponseOffsetMap = mutable.Map.empty + case t: Throwable => + error("ReplicaManager getBestOffset Throwable=" + t.getMessage + " in " + remainingRetries + "/" + retries) + Thread.sleep(100) + ResponseOffsetMap = mutable.Map.empty + } finally { + if (smartExtendManager != null) { + smartExtendManager.close + } + } + } + ResponseOffsetMap + } + private def maybeShrinkIsr(): Unit = { trace("Evaluating ISR list of partitions to see which replicas can be removed from the ISR") allPartitions.values.foreach(partition => partition.maybeShrinkIsr(config.replicaLagTimeMaxMs)) @@ -915,7 +1093,9 @@ readResults.foreach { case (topicPartition, readResult) => getPartition(topicPartition) match { case Some(partition) => - partition.updateReplicaLogReadResult(replicaId, readResult) + val leaderOffsetMetaData: NewOffsetMetaData = getOffsetMetaDataForPartition(partition.leaderReplicaIdOpt.get, topicPartition) + val followerOffsetMetaData: NewOffsetMetaData = getOffsetMetaDataForPartition(replicaId, topicPartition) + partition.updateReplicaLogReadResult(replicaId, readResult, leaderOffsetMetaData, followerOffsetMetaData) // for producer requests with ack > 1, we need to check // if they can be unblocked after some follower's log end offsets have moved @@ -926,10 +1106,14 @@ } } - private def getLeaderPartitions(): List[Partition] = { + def getLeaderPartitions(): List[Partition] = { allPartitions.values.filter(_.leaderReplicaIfLocal.isDefined).toList } + def getFollowerPartitions(): Set[Partition] = { + allPartitions.values.filterNot(_.leaderReplicaIfLocal.isDefined).toSet + } + def getHighWatermark(topicPartition: TopicPartition): Option[Long] = { getPartition(topicPartition).flatMap { partition => partition.leaderReplicaIfLocal.map(_.highWatermark.messageOffset) Index: core/src/main/scala/kafka/server/SmartExtendManager.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- core/src/main/scala/kafka/server/SmartExtendManager.scala (revision dfb2d3156fcbf445d8615aa194e518e09150c42f) +++ core/src/main/scala/kafka/server/SmartExtendManager.scala (revision dfb2d3156fcbf445d8615aa194e518e09150c42f) @@ -0,0 +1,45 @@ +package kafka.server + +import java.util + +import kafka.admin.AdminClient +import kafka.cluster.{BrokerEndPoint, Partition} +import kafka.metrics.KafkaMetricsGroup +import kafka.utils.Logging +import org.apache.kafka.common.{NewOffsetMetaData, Node, TopicPartition} +import org.apache.kafka.common.requests.GetStartOffsetResponse + +import scala.collection.mutable + +class SmartExtendManager (val config: KafkaConfig) extends Logging with KafkaMetricsGroup { + private val SmartExtendChannel = AdminClient.createSimplePlaintext(config.bootstarpServers) + + private def getLogTimestamp(partition: Partition, isLst: Boolean): Long = { + if (isLst) { + partition.logManager.getLog(partition.topicPartition).get.segments.firstEntry().getValue.log.creationTime() + } else { + partition.logManager.getLog(partition.topicPartition).get.segments.lastEntry().getValue.log.file.lastModified() + } + } + + def close: Unit = { + SmartExtendChannel.close() + } + + def sendRequest(broker: BrokerEndPoint, partitions: mutable.Set[Partition]): util.Map[TopicPartition, GetStartOffsetResponse.StartOffsetResponse] = { + val partitionOffsetMetaDatas = new util.LinkedHashMap[TopicPartition, NewOffsetMetaData] + partitions.map{ partition => + // local brokerid + val brokerid:Int = config.brokerId + val leo:Long = partition.getReplica().get.logEndOffset.messageOffset + val lso:Long = partition.getReplica().get.logStartOffset + val lst:Long = getLogTimestamp(partition, true) + val let:Long = getLogTimestamp(partition, false) + partitionOffsetMetaDatas.put(partition.topicPartition, new NewOffsetMetaData(brokerid, leo, lst, let ,lso)) + } + val node = new Node(broker.id, broker.host, broker.port) + debug("sendRequest broker=" + broker + " partitionOffsetMetaDatas=" + partitionOffsetMetaDatas) + import scala.collection.JavaConversions.mapAsJavaMap + SmartExtendChannel.getStartOffset(node, partitionOffsetMetaDatas) + } +}