Uploaded image for project: 'Samza'
  1. Samza
  2. SAMZA-560

StreamAppender not working after upgrading kafka producer API

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Won't Fix
    • None
    • None
    • None
    • None

    Description

      After SAMZA-227, StreamAppender is not working. Digging into it a little, still can not figure it out.

      It throws exception with

      Exception in thread "main" org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
      

      Log:

      2015-02-11 12:19:15 SamzaAppMaster$ [INFO] got container id: container_1423528474084_0023_02_000001
      2015-02-11 12:19:15 KafkaSystemProducer [TRACE] Enqueueing message: log4j-log, OutgoingMessageEnvelope [systemStream=SystemStream [system=kafka2, stream=__samza_printout_task1_1_logs], keySerializerName=null, messageSerializerName=null, partitionKey=[B@155a6bd1, key=[B@155a6bd1, message=[B@635c714a].
      2015-02-11 12:19:15 KafkaSystemProducer [INFO] Creating a new producer for system kafka2.
      2015-02-11 12:19:15 ProducerConfig [INFO] ProducerConfig values: 
      	block.on.buffer.full = true
      	retry.backoff.ms = 100
      	buffer.memory = 33554432
      	batch.size = 16384
      	metrics.sample.window.ms = 30000
      	metadata.max.age.ms = 300000
      	receive.buffer.bytes = 32768
      	timeout.ms = 30000
      	max.in.flight.requests.per.connection = 1
      	metric.reporters = []
      	bootstrap.servers = [localhost:9092]
      	client.id = samza_producer-printout_task1-1-1423685955347-0
      	compression.type = none
      	retries = 2147483647
      	max.request.size = 1048576
      	send.buffer.bytes = 131072
      	acks = 1
      	reconnect.backoff.ms = 10
      	linger.ms = 0
      	metrics.num.samples = 2
      	metadata.fetch.timeout.ms = 60000
      
      2015-02-11 12:19:15 KafkaProducer [TRACE] Starting the Kafka producer
      2015-02-11 12:19:15 Metadata [DEBUG] Updated cluster metadata version 1 to Cluster(nodes = [Node(localhost, 9092)], partitions = [])
      2015-02-11 12:19:15 KafkaProducer [DEBUG] Kafka producer started
      2015-02-11 12:19:15 KafkaSystemProducer [DEBUG] Created a new producer for system kafka2.
      2015-02-11 12:19:15 KafkaProducer [TRACE] Requesting metadata update for topic __samza_printout_task1_1_logs.
      2015-02-11 12:19:15 Sender [DEBUG] Starting Kafka producer I/O thread.
      2015-02-11 12:20:15 KafkaSystemProducer [TRACE] Enqueueing message: log4j-log, OutgoingMessageEnvelope [systemStream=SystemStream [system=kafka2, stream=__samza_printout_task1_1_logs], keySerializerName=null, messageSerializerName=null, partitionKey=[B@609548c3, key=[B@609548c3, message=[B@68dc2bbe].
      2015-02-11 12:20:15 KafkaProducer [TRACE] Requesting metadata update for topic __samza_printout_task1_1_logs.
      

      Also tested configuring two kafka systems in one job, which worked.

      Really can not figure it out why the KafkaProducer created through the StreamAppender does not work.

      Any ideas?

      Attachments

        Issue Links

          Activity

            People

              navina Navina Ramesh
              closeuris Yan Fang
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: