Details
Description
in ClientUtilTopicMetadataStore, samza-kafka use ClientUtils.fetchTopicMetadata to get topic metadata, this is a deprecated method in kafka. this method use SyncProducer and may get some problem.
// code placeholder def getTopicInfo(topics: Set[String]) = { val currCorrId = corrID.getAndIncrement val response: TopicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, brokers, clientId, timeout, currCorrId) if (response.correlationId != currCorrId) { throw new SamzaException("CorrelationID did not match for request on topics %s (sent %d, got %d)" format (topics, currCorrId, response.correlationId)) } response.topicsMetadata .map(metadata => (metadata.topic, metadata)) .toMap }