diff --git a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala index 9159b79188..6df38fc812 100755 --- a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala +++ b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala @@ -68,16 +68,18 @@ class ZkMetadataCache( extends MetadataCache with ZkFinalizedFeatureCache with Logging { private val partitionMetadataLock = new ReentrantReadWriteLock() - //this is the cache state. every MetadataSnapshot instance is immutable, and updates (performed under a lock) - //replace the value with a completely new one. this means reads (which are not under any lock) need to grab - //the value of this var (into a val) ONCE and retain that read copy for the duration of their operation. - //multiple reads of this value risk getting different snapshots. - @volatile private var metadataSnapshot: MetadataSnapshot = MetadataSnapshot( + + private val EMPTY_METADATA = MetadataSnapshot( partitionStates = mutable.AnyRefMap.empty, topicIds = Map.empty, controllerId = None, aliveBrokers = mutable.LongMap.empty, aliveNodes = mutable.LongMap.empty) + //this is the cache state. every MetadataSnapshot instance is immutable, and updates (performed under a lock) + //replace the value with a completely new one. this means reads (which are not under any lock) need to grab + //the value of this var (into a val) ONCE and retain that read copy for the duration of their operation. + //multiple reads of this value risk getting different snapshots. + @volatile private var metadataSnapshot: MetadataSnapshot = EMPTY_METADATA this.logIdent = s"[MetadataCache brokerId=$brokerId] " private val stateChangeLogger = new StateChangeLogger(brokerId, inControllerContext = false, None) @@ -456,6 +458,7 @@ class ZkMetadataCache( metadataSnapshot = MetadataSnapshot(partitionStates, topicIds.toMap, controllerIdOpt, aliveBrokers, aliveNodes) } + metadataSnapshot = EMPTY_METADATA deletedPartitions } }