From 298fd8a523c593666fc904fb0b248cec79f75058 Mon Sep 17 00:00:00 2001 From: Joel Koshy Date: Tue, 5 May 2015 14:12:22 -0700 Subject: [PATCH 1/3] Rename from stale to expired --- .../main/scala/kafka/server/OffsetManager.scala | 26 +++++++++++----------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala index 18680ce..c7785f5 100755 --- a/core/src/main/scala/kafka/server/OffsetManager.scala +++ b/core/src/main/scala/kafka/server/OffsetManager.scala @@ -48,7 +48,7 @@ import org.I0Itec.zkclient.ZkClient * @param maxMetadataSize The maximum allowed metadata for any offset commit. * @param loadBufferSize Batch size for reading from the offsets segments when loading offsets into the cache. * @param offsetsRetentionMs Offsets older than this retention period will be discarded. - * @param offsetsRetentionCheckIntervalMs Frequency at which to check for stale offsets. + * @param offsetsRetentionCheckIntervalMs Frequency at which to check for expired offsets. * @param offsetsTopicNumPartitions The number of partitions for the offset commit topic (should not change after deployment). * @param offsetsTopicSegmentBytes The offsets topic segment bytes should be kept relatively small to facilitate faster * log compaction and faster offset loads @@ -100,8 +100,8 @@ class OffsetManager(val config: OffsetManagerConfig, this.logIdent = "[Offset Manager on Broker " + replicaManager.config.brokerId + "]: " - scheduler.schedule(name = "offsets-cache-compactor", - fun = compact, + scheduler.schedule(name = "offsets-cache-garbage-collector", + fun = collectGarbage, period = config.offsetsRetentionCheckIntervalMs, unit = TimeUnit.MILLISECONDS) @@ -117,20 +117,20 @@ class OffsetManager(val config: OffsetManagerConfig, } ) - private def compact() { - debug("Compacting offsets cache.") + private def collectGarbage() { + debug("Garbage collecting expired offsets.") val startMs = SystemTime.milliseconds - val staleOffsets = offsetsCache.filter { case (groupTopicPartition, offsetAndMetadata) => + val expiredOffsets = offsetsCache.filter { case (groupTopicPartition, offsetAndMetadata) => offsetAndMetadata.expireTimestamp < startMs } - debug("Found %d expired offsets.".format(staleOffsets.size)) + debug("Found %d expired offsets.".format(expiredOffsets.size)) - // delete the stale offsets from the table and generate tombstone messages to remove them from the log - val tombstonesForPartition = staleOffsets.map { case(groupTopicAndPartition, offsetAndMetadata) => + // delete the expired offsets from the table and generate tombstone messages to remove them from the log + val tombstonesForPartition = expiredOffsets.map { case(groupTopicAndPartition, offsetAndMetadata) => val offsetsPartition = partitionFor(groupTopicAndPartition.group) - trace("Removing stale offset and metadata for %s: %s".format(groupTopicAndPartition, offsetAndMetadata)) + trace("Removing expired offset and metadata for %s: %s".format(groupTopicAndPartition, offsetAndMetadata)) offsetsCache.remove(groupTopicAndPartition) @@ -141,7 +141,7 @@ class OffsetManager(val config: OffsetManagerConfig, }.groupBy{ case (partition, tombstone) => partition } // Append the tombstone messages to the offset partitions. It is okay if the replicas don't receive these (say, - // if we crash or leaders move) since the new leaders will get rid of stale offsets during their own purge cycles. + // if we crash or leaders move) since the new leaders will get rid of expired offsets during their own purge cycles. val numRemoved = tombstonesForPartition.flatMap { case(offsetsPartition, tombstones) => val partitionOpt = replicaManager.getPartition(OffsetManager.OffsetsTopicName, offsetsPartition) partitionOpt.map { partition => @@ -158,14 +158,14 @@ class OffsetManager(val config: OffsetManagerConfig, } catch { case t: Throwable => - error("Failed to mark %d stale offsets for deletion in %s.".format(messages.size, appendPartition), t) + error("Failed to mark %d expired offsets for deletion in %s.".format(messages.size, appendPartition), t) // ignore and continue 0 } } }.sum - debug("Removed %d stale offsets in %d milliseconds.".format(numRemoved, SystemTime.milliseconds - startMs)) + debug("Removed %d expired offsets in %d milliseconds.".format(numRemoved, SystemTime.milliseconds - startMs)) } -- 1.7.12.4 From e4167385f1173b8c08665b67c7af9c5e0c46f442 Mon Sep 17 00:00:00 2001 From: Joel Koshy Date: Tue, 5 May 2015 18:02:26 -0700 Subject: [PATCH 2/3] fix --- .../main/scala/kafka/server/OffsetManager.scala | 136 +++++++++++---------- 1 file changed, 71 insertions(+), 65 deletions(-) diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala index c7785f5..a42aa68 100755 --- a/core/src/main/scala/kafka/server/OffsetManager.scala +++ b/core/src/main/scala/kafka/server/OffsetManager.scala @@ -94,14 +94,16 @@ class OffsetManager(val config: OffsetManagerConfig, private val offsetsCache = new Pool[GroupTopicPartition, OffsetAndMetadata] private val followerTransitionLock = new Object + private val cleanupOrLoadMutex = new Object + private val loadingPartitions: mutable.Set[Int] = mutable.Set() private val shuttingDown = new AtomicBoolean(false) this.logIdent = "[Offset Manager on Broker " + replicaManager.config.brokerId + "]: " - scheduler.schedule(name = "offsets-cache-garbage-collector", - fun = collectGarbage, + scheduler.schedule(name = "delete-expired-consumer-offsets", + fun = deleteExpiredOffsets, period = config.offsetsRetentionCheckIntervalMs, unit = TimeUnit.MILLISECONDS) @@ -117,55 +119,57 @@ class OffsetManager(val config: OffsetManagerConfig, } ) - private def collectGarbage() { + private def deleteExpiredOffsets() { debug("Garbage collecting expired offsets.") val startMs = SystemTime.milliseconds - val expiredOffsets = offsetsCache.filter { case (groupTopicPartition, offsetAndMetadata) => - offsetAndMetadata.expireTimestamp < startMs - } + val numExpiredOffsetsRemoved = cleanupOrLoadMutex synchronized { + val expiredOffsets = offsetsCache.filter { case (groupTopicPartition, offsetAndMetadata) => + offsetAndMetadata.expireTimestamp < startMs + } - debug("Found %d expired offsets.".format(expiredOffsets.size)) + debug("Found %d expired offsets.".format(expiredOffsets.size)) - // delete the expired offsets from the table and generate tombstone messages to remove them from the log - val tombstonesForPartition = expiredOffsets.map { case(groupTopicAndPartition, offsetAndMetadata) => - val offsetsPartition = partitionFor(groupTopicAndPartition.group) - trace("Removing expired offset and metadata for %s: %s".format(groupTopicAndPartition, offsetAndMetadata)) + // delete the expired offsets from the table and generate tombstone messages to remove them from the log + val tombstonesForPartition = expiredOffsets.map { case (groupTopicAndPartition, offsetAndMetadata) => + val offsetsPartition = partitionFor(groupTopicAndPartition.group) + trace("Removing expired offset and metadata for %s: %s".format(groupTopicAndPartition, offsetAndMetadata)) - offsetsCache.remove(groupTopicAndPartition) + offsetsCache.remove(groupTopicAndPartition) - val commitKey = OffsetManager.offsetCommitKey(groupTopicAndPartition.group, - groupTopicAndPartition.topicPartition.topic, groupTopicAndPartition.topicPartition.partition) + val commitKey = OffsetManager.offsetCommitKey(groupTopicAndPartition.group, + groupTopicAndPartition.topicPartition.topic, groupTopicAndPartition.topicPartition.partition) - (offsetsPartition, new Message(bytes = null, key = commitKey)) - }.groupBy{ case (partition, tombstone) => partition } + (offsetsPartition, new Message(bytes = null, key = commitKey)) + }.groupBy { case (partition, tombstone) => partition } - // Append the tombstone messages to the offset partitions. It is okay if the replicas don't receive these (say, - // if we crash or leaders move) since the new leaders will get rid of expired offsets during their own purge cycles. - val numRemoved = tombstonesForPartition.flatMap { case(offsetsPartition, tombstones) => - val partitionOpt = replicaManager.getPartition(OffsetManager.OffsetsTopicName, offsetsPartition) - partitionOpt.map { partition => - val appendPartition = TopicAndPartition(OffsetManager.OffsetsTopicName, offsetsPartition) - val messages = tombstones.map(_._2).toSeq + // Append the tombstone messages to the offset partitions. It is okay if the replicas don't receive these (say, + // if we crash or leaders move) since the new leaders will get rid of expired offsets during their own purge cycles. + tombstonesForPartition.flatMap { case (offsetsPartition, tombstones) => + val partitionOpt = replicaManager.getPartition(OffsetManager.OffsetsTopicName, offsetsPartition) + partitionOpt.map { partition => + val appendPartition = TopicAndPartition(OffsetManager.OffsetsTopicName, offsetsPartition) + val messages = tombstones.map(_._2).toSeq - trace("Marked %d offsets in %s for deletion.".format(messages.size, appendPartition)) + trace("Marked %d offsets in %s for deletion.".format(messages.size, appendPartition)) - try { - // do not need to require acks since even if the tombsone is lost, - // it will be appended again in the next purge cycle - partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages:_*)) - tombstones.size - } - catch { - case t: Throwable => - error("Failed to mark %d expired offsets for deletion in %s.".format(messages.size, appendPartition), t) - // ignore and continue - 0 + try { + // do not need to require acks since even if the tombsone is lost, + // it will be appended again in the next purge cycle + partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages: _*)) + tombstones.size + } + catch { + case t: Throwable => + error("Failed to mark %d expired offsets for deletion in %s.".format(messages.size, appendPartition), t) + // ignore and continue + 0 + } } - } - }.sum + }.sum + } - debug("Removed %d expired offsets in %d milliseconds.".format(numRemoved, SystemTime.milliseconds - startMs)) + debug("Removed %d expired offsets in %d milliseconds.".format(numExpiredOffsetsRemoved, SystemTime.milliseconds - startMs)) } @@ -369,34 +373,36 @@ class OffsetManager(val config: OffsetManagerConfig, var currOffset = log.logSegments.head.baseOffset val buffer = ByteBuffer.allocate(config.loadBufferSize) // loop breaks if leader changes at any time during the load, since getHighWatermark is -1 - while (currOffset < getHighWatermark(offsetsPartition) && !shuttingDown.get()) { - buffer.clear() - val messages = log.read(currOffset, config.loadBufferSize).messageSet.asInstanceOf[FileMessageSet] - messages.readInto(buffer, 0) - val messageSet = new ByteBufferMessageSet(buffer) - messageSet.foreach { msgAndOffset => - require(msgAndOffset.message.key != null, "Offset entry key should not be null") - val key = OffsetManager.readMessageKey(msgAndOffset.message.key) - if (msgAndOffset.message.payload == null) { - if (offsetsCache.remove(key) != null) - trace("Removed offset for %s due to tombstone entry.".format(key)) - else - trace("Ignoring redundant tombstone for %s.".format(key)) - } else { - // special handling for version 0: - // set the expiration time stamp as commit time stamp + server default retention time - val value = OffsetManager.readMessageValue(msgAndOffset.message.payload) - putOffset(key, value.copy ( - expireTimestamp = { - if (value.expireTimestamp == org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP) - value.commitTimestamp + config.offsetsRetentionMs - else - value.expireTimestamp - } - )) - trace("Loaded offset %s for %s.".format(value, key)) + cleanupOrLoadMutex synchronized { + while (currOffset < getHighWatermark(offsetsPartition) && !shuttingDown.get()) { + buffer.clear() + val messages = log.read(currOffset, config.loadBufferSize).messageSet.asInstanceOf[FileMessageSet] + messages.readInto(buffer, 0) + val messageSet = new ByteBufferMessageSet(buffer) + messageSet.foreach { msgAndOffset => + require(msgAndOffset.message.key != null, "Offset entry key should not be null") + val key = OffsetManager.readMessageKey(msgAndOffset.message.key) + if (msgAndOffset.message.payload == null) { + if (offsetsCache.remove(key) != null) + trace("Removed offset for %s due to tombstone entry.".format(key)) + else + trace("Ignoring redundant tombstone for %s.".format(key)) + } else { + // special handling for version 0: + // set the expiration time stamp as commit time stamp + server default retention time + val value = OffsetManager.readMessageValue(msgAndOffset.message.payload) + putOffset(key, value.copy ( + expireTimestamp = { + if (value.expireTimestamp == org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP) + value.commitTimestamp + config.offsetsRetentionMs + else + value.expireTimestamp + } + )) + trace("Loaded offset %s for %s.".format(value, key)) + } + currOffset = msgAndOffset.nextOffset } - currOffset = msgAndOffset.nextOffset } } -- 1.7.12.4 From 5cb0ccf8036746279ff7656933ca84643ad4ca52 Mon Sep 17 00:00:00 2001 From: Joel Koshy Date: Wed, 6 May 2015 14:29:33 -0700 Subject: [PATCH 3/3] renames and logging improvements --- core/src/main/scala/kafka/cluster/Partition.scala | 2 +- core/src/main/scala/kafka/server/OffsetManager.scala | 19 ++++++++++--------- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 122b1db..730a232 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -219,7 +219,7 @@ class Partition(val topic: String, if (topic == OffsetManager.OffsetsTopicName && /* if we are making a leader->follower transition */ leaderReplica == localBrokerId) - offsetManager.clearOffsetsInPartition(partitionId) + offsetManager.removeOffsetsFromCacheForPartition(partitionId) } if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == newLeaderBrokerId) { diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala index a42aa68..df919f7 100755 --- a/core/src/main/scala/kafka/server/OffsetManager.scala +++ b/core/src/main/scala/kafka/server/OffsetManager.scala @@ -93,11 +93,8 @@ class OffsetManager(val config: OffsetManagerConfig, /* offsets and metadata cache */ private val offsetsCache = new Pool[GroupTopicPartition, OffsetAndMetadata] private val followerTransitionLock = new Object - - private val cleanupOrLoadMutex = new Object - private val loadingPartitions: mutable.Set[Int] = mutable.Set() - + private val cleanupOrLoadMutex = new Object private val shuttingDown = new AtomicBoolean(false) this.logIdent = "[Offset Manager on Broker " + replicaManager.config.brokerId + "]: " @@ -120,7 +117,7 @@ class OffsetManager(val config: OffsetManagerConfig, ) private def deleteExpiredOffsets() { - debug("Garbage collecting expired offsets.") + debug("Collecting expired offsets.") val startMs = SystemTime.milliseconds val numExpiredOffsetsRemoved = cleanupOrLoadMutex synchronized { @@ -169,7 +166,7 @@ class OffsetManager(val config: OffsetManagerConfig, }.sum } - debug("Removed %d expired offsets in %d milliseconds.".format(numExpiredOffsetsRemoved, SystemTime.milliseconds - startMs)) + info("Removed %d expired offsets in %d milliseconds.".format(numExpiredOffsetsRemoved, SystemTime.milliseconds - startMs)) } @@ -440,16 +437,20 @@ class OffsetManager(val config: OffsetManagerConfig, * that partition. * @param offsetsPartition Groups belonging to this partition of the offsets topic will be deleted from the cache. */ - def clearOffsetsInPartition(offsetsPartition: Int) { - debug("Deleting offset entries belonging to [%s,%d].".format(OffsetManager.OffsetsTopicName, offsetsPartition)) - + def removeOffsetsFromCacheForPartition(offsetsPartition: Int) { + var numRemoved = 0 followerTransitionLock synchronized { offsetsCache.keys.foreach { key => if (partitionFor(key.group) == offsetsPartition) { offsetsCache.remove(key) + numRemoved += 1 } } } + + if (numRemoved > 0) info("Removed %d cached offsets for %s on follower transition." + .format(numRemoved, TopicAndPartition(OffsetManager.OffsetsTopicName, offsetsPartition))) + } def shutdown() { -- 1.7.12.4