Uploaded image for project: 'Samza'
  1. Samza
  2. SAMZA-1658

samza-kafka module use kafka deprecated method to get topic metadata

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Closed
    • Major
    • Resolution: Fixed
    • 0.12.0, 0.13.0, 0.14.0, 0.13.1
    • 0.15.0
    • kafka
    • None

    Description

      in  ClientUtilTopicMetadataStore, samza-kafka use ClientUtils.fetchTopicMetadata to get topic metadata, this is a deprecated method in kafka. this method use SyncProducer and  may get some problem.

      // code placeholder
      def getTopicInfo(topics: Set[String]) = {
        val currCorrId = corrID.getAndIncrement
        val response: TopicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, brokers, clientId, timeout, currCorrId)
      
        if (response.correlationId != currCorrId) {
          throw new SamzaException("CorrelationID did not match for request on topics %s (sent %d, got %d)" format (topics, currCorrId, response.correlationId))
        }
      
        response.topicsMetadata
          .map(metadata => (metadata.topic, metadata))
          .toMap
      }
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            sailingYang sailingYang
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: