Uploaded image for project: 'Samza'
  1. Samza
  2. SAMZA-1371

Some Samza Containers get stuck at "Starting BrokerProxy for hostname:portnum" while others seem to be fine

    Details

    • Type: Bug
    • Status: In Progress
    • Priority: Blocker
    • Resolution: Unresolved
    • Affects Version/s: 0.11.0, 0.12.0
    • Fix Version/s: None
    • Component/s: container
    • Labels:
      None
    • Environment:

      Samza version: 0.11, 0.12
      Kafka version: 0.11.0.0

      Description

      We have multiple Samza apps using local store that have this issue. Some containers get stuck on "Starting BrokerProxy for hostname:portnum" while others seem to work as expected.

      Here is the log:

      stuck:
      ```
      [...]
      2017-07-25 17:11:26.546 [main] org.apache.samza.system.kafka.BrokerProxy [INFO] Creating new SimpleConsumer for host hostname:portnum for system kafka
      2017-07-25 17:11:26.547 [main] org.apache.samza.system.kafka.GetOffset [INFO] Validating offset 0 for topic and partition [prod.localStateChangeLog.prod.AlertsOrganizerInstant_matcherValidation,2]
      2017-07-25 17:11:26.648 [main] org.apache.samza.system.kafka.GetOffset [INFO] Able to successfully read from offset 0 for topic and partition [prod.localStateChangeLog.prod.AlertsOrganizerInstant_matcherValidation,2]. Using it to instantiate consumer.
      2017-07-25 17:11:26.649 [main] org.apache.samza.system.kafka.BrokerProxy [INFO] Starting BrokerProxy for hostname:portnum
      // it's dead, Jim
      ```
      healthy:
      ```
      [...]
      2017-07-25 17:11:26.920 [main] org.apache.samza.system.kafka.BrokerProxy [INFO] Creating new SimpleConsumer for host hostname:portnum for system kafka
      2017-07-25 17:11:26.921 [main] org.apache.samza.system.kafka.GetOffset [INFO] Validating offset 0 for topic and partition [prod.localStateChangeLog.prod.AlertsOrganizerInstant_matcherValidation,1]
      2017-07-25 17:11:27.023 [main] org.apache.samza.system.kafka.GetOffset [INFO] Able to successfully read from offset 0 for topic and partition [prod.localStateChangeLog.prod.AlertsOrganizerInstant_matcherValidation,1]. Using it to instantiate consumer.
      2017-07-25 17:11:27.023 [main] org.apache.samza.system.kafka.BrokerProxy [INFO] Starting BrokerProxy for hostname:portnum
      2017-07-25 17:11:29.194 [main] org.apache.samza.system.kafka.BrokerProxy [INFO] Shutting down BrokerProxy for hostname:portnum
      2017-07-25 17:11:29.194 [main] org.apache.samza.system.kafka.BrokerProxy [INFO] closing simple consumer...
      2017-07-25 17:11:29.239 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at hostname:portnum for client samza_consumer-prod_AlertsOrganizerInstant-1] org.apache.samza.system.kafka.DefaultFetchSimpleConsumer [INFO] Reconnect due to socket error: java.nio.channels.ClosedChannelException
      2017-07-25 17:11:29.244 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at hostname:portnum for client samza_consumer-prod_AlertsOrganizerInstant-1] org.apache.samza.system.kafka.BrokerProxy [WARN] Restarting consumer due to java.nio.channels.ClosedChannelException. Releasing ownership of all partitions, and restarting consumer. Turn on debugging to get a full stack trace.
      2017-07-25 17:11:29.247 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at hostname:portnum for client samza_consumer-prod_AlertsOrganizerInstant-1] org.apache.samza.system.kafka.KafkaSystemConsumer [INFO] Abdicating for [prod.localStateChangeLog.prod.AlertsOrganizerInstant_alertSetting,1]
      2017-07-25 17:11:29.247 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at hostname:portnum for client samza_consumer-prod_AlertsOrganizerInstant-1] org.apache.samza.system.kafka.KafkaSystemConsumer [INFO] Refreshing brokers for: Map([prod.localStateChangeLog.prod.AlertsOrganizerInstant_alertSetting,1] -> 13572)
      2017-07-25 17:11:29.247 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at hostname:portnum for client samza_consumer-prod_AlertsOrganizerInstant-1] org.apache.samza.system.kafka.BrokerProxy [INFO] Shutting down due to interrupt.
      2017-07-25 17:11:29.247 [main] org.apache.samza.system.kafka.BrokerProxy [INFO] Shutting down BrokerProxy for hostname:portnum
      2017-07-25 17:11:29.248 [main] org.apache.samza.system.kafka.BrokerProxy [INFO] closing simple consumer...
      2017-07-25 17:11:29.265 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at hostname:portnum for client samza_consumer-prod_AlertsOrganizerInstant-1] org.apache.samza.system.kafka.BrokerProxy [INFO] Shutting down due to interrupt.
      2017-07-25 17:11:29.265 [main] org.apache.samza.system.kafka.BrokerProxy [INFO] Shutting down BrokerProxy for hostname:portnum
      2017-07-25 17:11:29.265 [main] org.apache.samza.system.kafka.BrokerProxy [INFO] closing simple consumer...
      2017-07-25 17:11:29.523 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at hostname:portnum for client samza_consumer-prod_AlertsOrganizerInstant-1] org.apache.samza.system.kafka.BrokerProxy [INFO] Shutting down due to interrupt.
      2017-07-25 17:11:29.524 [main] org.apache.samza.system.kafka.BrokerProxy [INFO] Shutting down BrokerProxy for hostname:portnum
      2017-07-25 17:11:29.524 [main] org.apache.samza.system.kafka.BrokerProxy [INFO] closing simple consumer...
      2017-07-25 17:11:29.601 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at hostname:portnum for client samza_consumer-prod_AlertsOrganizerInstant-1] org.apache.samza.system.kafka.BrokerProxy [INFO] Shutting down due to interrupt.
      2017-07-25 17:11:29.602 [main] org.apache.samza.system.kafka.BrokerProxy [INFO] Shutting down BrokerProxy for hostname:portnum
      2017-07-25 17:11:29.602 [main] org.apache.samza.system.kafka.BrokerProxy [INFO] closing simple consumer...
      2017-07-25 17:11:29.663 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at hostname:portnum for client samza_consumer-prod_AlertsOrganizerInstant-1] org.apache.samza.system.kafka.BrokerProxy [INFO] Shutting down due to interrupt.
      2017-07-25 17:11:29.668 [main] org.apache.samza.container.SamzaContainer [INFO] Starting host statistics monitor
      2017-07-25 17:11:29.670 [main] org.apache.samza.container.SamzaContainer [INFO] Registering task instances with producers.
      2017-07-25 17:11:29.674 [main] org.apache.samza.container.SamzaContainer [INFO] Starting producer multiplexer.
      2017-07-25 17:11:29.675 [main] org.apache.samza.container.SamzaContainer [INFO] Initializing stream tasks.
      2017-07-25 17:11:29.676 [main] com.company.samza.app.companyStreamingAppWrapper [INFO] Initializing instance of streaming application
      2017-07-25 17:11:29.681 [main] com.company.samza.app.companyStreamingAppWrapper [INFO] First initialization. Setting up Guice container with configuration companyStreamingAppWrapperConfiguration

      {company.app.name=AlertsOrganizerInstant, company.appgroup=aws, company.env=prod, company.guice.module=com.company.notifications.Alerts.organizer..AlertsOrganizerModule}

      2017-07-25 17:11:30.118 [main] com.company.config.guice.configModule [INFO] configModule loaded requested override file '/storage/data/secure/config/AnalyticsServiceClient.cfg'
      2017-07-25 17:11:30.480 [main] com.company.samza.dataService.SamzaSessionFactoriesModule [INFO] Loading prod dbConfig from /data/config/prod.database.properties
      // Hibernate stuff (i.e. our code is hit)
      ```

        Activity

        Hide
        akkaul Ak Ka added a comment -

        we’ve narrowed the indefinite hanging down to this function:
        ```private def restoreStores() {
        debug("Restoring stores.")

        for ((storeName, store) <- taskStoresToRestore) {
        if (changeLogSystemStreams.contains(storeName))

        { val systemStream = changeLogSystemStreams(storeName) val systemStreamPartition = new SystemStreamPartition(systemStream, partition) val systemConsumer = storeConsumers(storeName) val systemConsumerIterator = new SystemStreamPartitionIterator(systemConsumer, systemStreamPartition) store.restore(systemConsumerIterator) }

        }
        }
        ```
        which lives in `TaskStorageManager.scala`.

        The call stack that leads to this call is:
        ```SamzaContainer -> run()
        TaskInstance -> startStores()
        TaskStoreManager -> init()
        -> restoreStores()```

        Show
        akkaul Ak Ka added a comment - we’ve narrowed the indefinite hanging down to this function: ```private def restoreStores() { debug("Restoring stores.") for ((storeName, store) <- taskStoresToRestore) { if (changeLogSystemStreams.contains(storeName)) { val systemStream = changeLogSystemStreams(storeName) val systemStreamPartition = new SystemStreamPartition(systemStream, partition) val systemConsumer = storeConsumers(storeName) val systemConsumerIterator = new SystemStreamPartitionIterator(systemConsumer, systemStreamPartition) store.restore(systemConsumerIterator) } } } ``` which lives in `TaskStorageManager.scala`. The call stack that leads to this call is: ```SamzaContainer -> run() TaskInstance -> startStores() TaskStoreManager -> init() -> restoreStores()```
        Hide
        nickpan47 Yi Pan (Data Infrastructure) added a comment -

        Ak Ka, thanks for reporting this issue. It seems that the consumer instance that reads the changelog topic keeps dying when restoring the KV store. The code is written retrying to connect to the Kafka broker if disconnected and the consumer initial polling tells the container there are more messages at the broker. Hence, I would recommend dig into the issue causing the Kafka consumer dye.

        Secondary, are you asking to fail the container with a max retry number when recover KV store exception happens?

        Show
        nickpan47 Yi Pan (Data Infrastructure) added a comment - Ak Ka , thanks for reporting this issue. It seems that the consumer instance that reads the changelog topic keeps dying when restoring the KV store. The code is written retrying to connect to the Kafka broker if disconnected and the consumer initial polling tells the container there are more messages at the broker. Hence, I would recommend dig into the issue causing the Kafka consumer dye. Secondary, are you asking to fail the container with a max retry number when recover KV store exception happens?
        Hide
        nickpan47 Yi Pan (Data Infrastructure) added a comment -

        Ak Ka, any updates on the investigation? Is there anything not clear to you in my comment last time? Do you need further help? Thanks!

        Show
        nickpan47 Yi Pan (Data Infrastructure) added a comment - Ak Ka , any updates on the investigation? Is there anything not clear to you in my comment last time? Do you need further help? Thanks!
        Hide
        hao.song Hao Song added a comment -

        I verified that this also happened on samza 0.12 and kafka 0.11.

        To reply to your original question on 07/27, we would like to have more insight into why there is an exception and stacktrace. Unfortunately right now there is no log at all to indicate that there is an issue. Can you add some verbose logging around that part of code?

        Thanks!

        Show
        hao.song Hao Song added a comment - I verified that this also happened on samza 0.12 and kafka 0.11. To reply to your original question on 07/27, we would like to have more insight into why there is an exception and stacktrace. Unfortunately right now there is no log at all to indicate that there is an issue. Can you add some verbose logging around that part of code? Thanks!
        Hide
        nickpan47 Yi Pan (Data Infrastructure) added a comment -

        Hao Song, thanks! It would be good to get the logs to understand the behavior of the Kafka consumer. One clarification: Samza up to now is only officially test/validated against Kafka 0.10.1.1 (i.e. Samza uses Kafka client version 0.10.1.1 in 0.12+). So, when you refer to the environment is Samza 0.12 + Kafka 0.11, which of the following are you referring to?

        1. You have excluded the kafka client 0.10.1.1 version from Samza and directly depend on kafka client 0.11 in your Samza application build
        2. You build and package your Samza application w/ kafka client 0.10.1.1 (which is included from samza packages) and run the application against Kafka cluster with brokers on version 0.11

        Which is the configuration that you refer to?

        Show
        nickpan47 Yi Pan (Data Infrastructure) added a comment - Hao Song , thanks! It would be good to get the logs to understand the behavior of the Kafka consumer. One clarification: Samza up to now is only officially test/validated against Kafka 0.10.1.1 (i.e. Samza uses Kafka client version 0.10.1.1 in 0.12+). So, when you refer to the environment is Samza 0.12 + Kafka 0.11, which of the following are you referring to? You have excluded the kafka client 0.10.1.1 version from Samza and directly depend on kafka client 0.11 in your Samza application build You build and package your Samza application w/ kafka client 0.10.1.1 (which is included from samza packages) and run the application against Kafka cluster with brokers on version 0.11 Which is the configuration that you refer to?
        Hide
        hao.song Hao Song added a comment - - edited

        I'm referring to #2. Although I believe samza 0.12 is using 0.10.0.1 according to https://mvnrepository.com/artifact/org.apache.samza/samza-kafka_2.10/0.12.0.
        So I'm using 0.10.0.1.

        Are you suggesting I should change it to 0.10.1.1?

        Also I tested with Samza 0.11 as well which uses kafka client 0.8.2.x.

        Show
        hao.song Hao Song added a comment - - edited I'm referring to #2. Although I believe samza 0.12 is using 0.10.0.1 according to https://mvnrepository.com/artifact/org.apache.samza/samza-kafka_2.10/0.12.0 . So I'm using 0.10.0.1. Are you suggesting I should change it to 0.10.1.1? Also I tested with Samza 0.11 as well which uses kafka client 0.8.2.x.
        Hide
        nickpan47 Yi Pan (Data Infrastructure) added a comment -

        Hao Song, regarding to more logging around consumer failures, you can configure your logging level for BrokerProxy class to debug to see more detailed error info (see the code snippet below):

                  (exception, loop) => {
                    warn("Restarting consumer due to %s. Releasing ownership of all partitions, and restarting consumer. Turn on debugging to get a full stack trace." format exception)
                    debug("Exception detail:", exception)
                    abdicateAll
                    reconnect = true
                  })
        

        And one more question in my original comment: are you considering a feature that configure the max tries for consumer re-connect to Kafka cluster?

        Thanks!

        -Yi

        Show
        nickpan47 Yi Pan (Data Infrastructure) added a comment - Hao Song , regarding to more logging around consumer failures, you can configure your logging level for BrokerProxy class to debug to see more detailed error info (see the code snippet below): (exception, loop) => { warn( "Restarting consumer due to %s. Releasing ownership of all partitions, and restarting consumer. Turn on debugging to get a full stack trace." format exception) debug( "Exception detail:" , exception) abdicateAll reconnect = true }) And one more question in my original comment: are you considering a feature that configure the max tries for consumer re-connect to Kafka cluster? Thanks! -Yi
        Hide
        hao.song Hao Song added a comment - - edited

        I don't think it's a consumer failure that caused the stuck container. Because we are not getting any log printout like you pasted above (i.e. "Restarting consumer due") in the stuck container. You probably mixed the "healthy" log with the "stuck" log in the description section.

        Show
        hao.song Hao Song added a comment - - edited I don't think it's a consumer failure that caused the stuck container. Because we are not getting any log printout like you pasted above (i.e. "Restarting consumer due") in the stuck container. You probably mixed the "healthy" log with the "stuck" log in the description section.
        Hide
        hao.song Hao Song added a comment -

        Just talked to my teammate who originally looked at this. It seems that the container got stuck after entered into this function. We would like to get more verbose logging than just "Restoring stores." in the method.

        Show
        hao.song Hao Song added a comment - Just talked to my teammate who originally looked at this. It seems that the container got stuck after entered into this function. We would like to get more verbose logging than just "Restoring stores." in the method.
        Hide
        nickpan47 Yi Pan (Data Infrastructure) added a comment -

        Hao Song apologize, I did miss the small labels "stuck/health" in the logs. Can you do a stack dump on the stuck container?

        Show
        nickpan47 Yi Pan (Data Infrastructure) added a comment - Hao Song apologize, I did miss the small labels "stuck/health" in the logs. Can you do a stack dump on the stuck container?
        Hide
        hao.song Hao Song added a comment -

        Thanks for the reply, Yi! I will make sure we get that whenever we repro this issue again (usually happens when we restart our samza job with local store)

        Show
        hao.song Hao Song added a comment - Thanks for the reply, Yi! I will make sure we get that whenever we repro this issue again (usually happens when we restart our samza job with local store)
        Hide
        hao.song Hao Song added a comment - - edited

        Okay. I was able to repro this problem.
        Below is the stack dump:
        "main" #1 prio=5 os_prio=0 tid=0x00007f3ff4026800 nid=0x15b5 waiting on condition [0x00007f3ffbd56000]
        java.lang.Thread.State: TIMED_WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)

        • parking to wait for <0x00000000bdcf65d8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
          at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
          at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
          at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
          at org.apache.samza.util.BlockingEnvelopeMap.poll(BlockingEnvelopeMap.java:140)
          at org.apache.samza.system.SystemStreamPartitionIterator.refresh(SystemStreamPartitionIterator.java:87)
          at org.apache.samza.system.SystemStreamPartitionIterator.hasNext(SystemStreamPartitionIterator.java:58)
          at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:41)
          at scala.collection.Iterator$class.foreach(Iterator.scala:727)
          at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
          at org.apache.samza.storage.kv.KeyValueStorageEngine.restore(KeyValueStorageEngine.scala:104)
          at org.apache.samza.storage.TaskStorageManager$$anonfun$restoreStores$3.apply(TaskStorageManager.scala:187)
          at org.apache.samza.storage.TaskStorageManager$$anonfun$restoreStores$3.apply(TaskStorageManager.scala:181)
          at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
          at scala.collection.immutable.Map$Map3.foreach(Map.scala:154)
          at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
          at org.apache.samza.storage.TaskStorageManager.restoreStores(TaskStorageManager.scala:181)
          at org.apache.samza.storage.TaskStorageManager.init(TaskStorageManager.scala:76)
          at org.apache.samza.container.TaskInstance.startStores(TaskInstance.scala:107)
          at org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:830)
          at org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:828)
          at scala.collection.Iterator$class.foreach(Iterator.scala:727)
          at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
          at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
          at org.apache.samza.container.SamzaContainer.startStores(SamzaContainer.scala:828)
          at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:719)
          at org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:122)
          at org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:89)
          at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)

        Locked ownable synchronizers:

        Show
        hao.song Hao Song added a comment - - edited Okay. I was able to repro this problem. Below is the stack dump: "main" #1 prio=5 os_prio=0 tid=0x00007f3ff4026800 nid=0x15b5 waiting on condition [0x00007f3ffbd56000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) parking to wait for <0x00000000bdcf65d8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467) at org.apache.samza.util.BlockingEnvelopeMap.poll(BlockingEnvelopeMap.java:140) at org.apache.samza.system.SystemStreamPartitionIterator.refresh(SystemStreamPartitionIterator.java:87) at org.apache.samza.system.SystemStreamPartitionIterator.hasNext(SystemStreamPartitionIterator.java:58) at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:41) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.samza.storage.kv.KeyValueStorageEngine.restore(KeyValueStorageEngine.scala:104) at org.apache.samza.storage.TaskStorageManager$$anonfun$restoreStores$3.apply(TaskStorageManager.scala:187) at org.apache.samza.storage.TaskStorageManager$$anonfun$restoreStores$3.apply(TaskStorageManager.scala:181) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.immutable.Map$Map3.foreach(Map.scala:154) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.samza.storage.TaskStorageManager.restoreStores(TaskStorageManager.scala:181) at org.apache.samza.storage.TaskStorageManager.init(TaskStorageManager.scala:76) at org.apache.samza.container.TaskInstance.startStores(TaskInstance.scala:107) at org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:830) at org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:828) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206) at org.apache.samza.container.SamzaContainer.startStores(SamzaContainer.scala:828) at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:719) at org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:122) at org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:89) at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) Locked ownable synchronizers: None ============================= Which points to this restoreStores function TaskStorageManager. Specifically, it got stuck at line: https://github.com/apache/samza/blob/0.12.0/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala#L187 I wish there are more logs but it never reach the next function stopConsumers() on line 192 which has another debug statement.
        Hide
        hao.song Hao Song added a comment -

        Can we get an update on this? Thanks.

        Show
        hao.song Hao Song added a comment - Can we get an update on this? Thanks.
        Hide
        nickpan47 Yi Pan (Data Infrastructure) added a comment -

        Hao Song thanks for the stack dump. Taking a look now.

        Show
        nickpan47 Yi Pan (Data Infrastructure) added a comment - Hao Song thanks for the stack dump. Taking a look now.
        Hide
        nickpan47 Yi Pan (Data Infrastructure) added a comment -

        Hao Song, so, from the stack trace, it looks like that the restoration process stuck at SystemStreamPartitionIterator#refresh, which invokes KafkaSystemConsumer#poll method. From that point, it determines that there are still messages in the partition at the broker, but was not able to get any from the queue, which should be populated by BrokerProxy thread that keeps fetching messages from broker and populated the corresponding queue. Could you post the full log from the stuck container as well? I wanted to see whether there were any log lines before this happened to the BrokerProxy thread.

        P.S. we have not test against Kafka broker 0.11. Hence, it could be an issue between broker respond differently between 0.10.1 and 0.11 that caused the BrokerProxy thread dies.

        Show
        nickpan47 Yi Pan (Data Infrastructure) added a comment - Hao Song , so, from the stack trace, it looks like that the restoration process stuck at SystemStreamPartitionIterator#refresh, which invokes KafkaSystemConsumer#poll method. From that point, it determines that there are still messages in the partition at the broker, but was not able to get any from the queue, which should be populated by BrokerProxy thread that keeps fetching messages from broker and populated the corresponding queue. Could you post the full log from the stuck container as well? I wanted to see whether there were any log lines before this happened to the BrokerProxy thread. P.S. we have not test against Kafka broker 0.11. Hence, it could be an issue between broker respond differently between 0.10.1 and 0.11 that caused the BrokerProxy thread dies.
        Hide
        nickpan47 Yi Pan (Data Infrastructure) added a comment -

        Hao Song, in your stack dump last time, did you see any thread with name "BROKER-PROXY-"? From your original log, I realized that the broker proxy thread may not be started successfully.

        Show
        nickpan47 Yi Pan (Data Infrastructure) added a comment - Hao Song , in your stack dump last time, did you see any thread with name "BROKER-PROXY-"? From your original log, I realized that the broker proxy thread may not be started successfully.
        Hide
        hao.song Hao Song added a comment - - edited

        Yes. As you can see from the description. The last line for problematic log is
        "2017-07-25 17:11:26.649 [main] org.apache.samza.system.kafka.BrokerProxy [INFO] Starting BrokerProxy for hostname:portnum"

        To answer your previous quetion: I also downgrade kafka brokers to 0.10.0.1 (exact same version as samza 0.12 kafka client version). We still have the same problem.

        You asked about posting the full log. Are you interested in the info log or debug log? The debug log is too big to be uploaded. If you want that I have to store it in S3 and point you to a link.

        Show
        hao.song Hao Song added a comment - - edited Yes. As you can see from the description. The last line for problematic log is "2017-07-25 17:11:26.649 [main] org.apache.samza.system.kafka.BrokerProxy [INFO] Starting BrokerProxy for hostname:portnum" To answer your previous quetion: I also downgrade kafka brokers to 0.10.0.1 (exact same version as samza 0.12 kafka client version). We still have the same problem. You asked about posting the full log. Are you interested in the info log or debug log? The debug log is too big to be uploaded. If you want that I have to store it in S3 and point you to a link.
        Hide
        nickpan47 Yi Pan (Data Infrastructure) added a comment -

        Hao Song yeah, that's interesting... Did you get a chance to check the thread dump to look for the thread name with "BROKER-PROXY-" as prefix? Could you attach the full stack dump file here?

        Show
        nickpan47 Yi Pan (Data Infrastructure) added a comment - Hao Song yeah, that's interesting... Did you get a chance to check the thread dump to look for the thread name with "BROKER-PROXY-" as prefix? Could you attach the full stack dump file here?
        Hide
        hao.song Hao Song added a comment - - edited

        [Attached thread dump]

        Show
        hao.song Hao Song added a comment - - edited [Attached thread dump]
        Hide
        hao.song Hao Song added a comment -

        Can I get an update, Yi Pan (Data Infrastructure)?

        Show
        hao.song Hao Song added a comment - Can I get an update, Yi Pan (Data Infrastructure) ?
        Hide
        nickpan47 Yi Pan (Data Infrastructure) added a comment -

        Hao Song sorry for the delay. I will take a look at the stack trace more. The first glance over it seems like all BROKER-PROXY threads are waiting for incoming messages. Let me read it a bit more. The next step would be getting some JMX dumps to see what causes the stuck conditions.

        Show
        nickpan47 Yi Pan (Data Infrastructure) added a comment - Hao Song sorry for the delay. I will take a look at the stack trace more. The first glance over it seems like all BROKER-PROXY threads are waiting for incoming messages. Let me read it a bit more. The next step would be getting some JMX dumps to see what causes the stuck conditions.
        Hide
        nickpan47 Yi Pan (Data Infrastructure) added a comment -

        Hao Song, I chat with Andrea when he is here for the meetup. He mentioned that the "hanging containers" was recovered when you triggered a re-balance in the Kafka broker (0.11 version). I want to clarify the test scenario that you went through s.t. I can consult with our Kafka team to understand whether it is a known issue between Kafka 0.11 broker vs older consumers. Hence, I am listing the following combinations and please let me know which one you have test and whether the problem exists or not:

        1. Samza 0.11, Kafka 0.11
        2. Samza 0.12, Kafka 0.11
        3. Samza 0.13, Kafka 0.11
        4. Samza 0.11, Kafka 0.10.2
        5. Samza 0.12, Kafka 0.10.2
        6. Samza 0.13, Kafka 0.10.2
        7. Samza 0.11, Kafka 0.10.1
        8. Samza 0.12, Kafka 0.10.1
        9. Samza 0.13, Kafka 0.10.1

        Thanks!

        Show
        nickpan47 Yi Pan (Data Infrastructure) added a comment - Hao Song , I chat with Andrea when he is here for the meetup. He mentioned that the "hanging containers" was recovered when you triggered a re-balance in the Kafka broker (0.11 version). I want to clarify the test scenario that you went through s.t. I can consult with our Kafka team to understand whether it is a known issue between Kafka 0.11 broker vs older consumers. Hence, I am listing the following combinations and please let me know which one you have test and whether the problem exists or not: Samza 0.11, Kafka 0.11 Samza 0.12, Kafka 0.11 Samza 0.13, Kafka 0.11 Samza 0.11, Kafka 0.10.2 Samza 0.12, Kafka 0.10.2 Samza 0.13, Kafka 0.10.2 Samza 0.11, Kafka 0.10.1 Samza 0.12, Kafka 0.10.1 Samza 0.13, Kafka 0.10.1 Thanks!

          People

          • Assignee:
            nickpan47 Yi Pan (Data Infrastructure)
            Reporter:
            akkaul Ak Ka
          • Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

            • Created:
              Updated:

              Development