Uploaded image for project: 'Samza'
  1. Samza
  2. SAMZA-723

hello-samza hangs when we use StreamAppender

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 0.10.0
    • Fix Version/s: 0.10.0
    • Component/s: hello-samza
    • Labels:
      None

      Description

      I added StreamAppender to log4j.xml , task.log4j.system=kafka in the config and added a simple log line in process call to see if it creates the logging event correctly.
      When I deploy the job (WikipediaFeedStreamTask.java), the AppMaster seems to just hang in the bootstrap method. I don’t get any exceptions in the AppMaster logs. I checked the kafka logs and found some socket closed errors in the stack trace.

      ==> deploy/kafka/logs/kafka.log <==
      [2015-06-25 16:48:01,179] ERROR Closing socket for /172.21.132.57 because of error (kafka.network.Processor)
      java.io.IOException: Connection reset by peer
      at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
      at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
      at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
      at sun.nio.ch.IOUtil.write(IOUtil.java:65)
      at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
      at kafka.api.TopicDataSend.writeTo(FetchResponse.scala:123)
      at kafka.network.MultiSend.writeTo(Transmission.scala:101)
      at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:231)
      at kafka.network.Processor.write(SocketServer.scala:472)
      at kafka.network.Processor.run(SocketServer.scala:342)
      at java.lang.Thread.run(Thread.java:745)

      ==> deploy/kafka/logs/server.log <==
      [2015-06-25 16:48:01,179] ERROR Closing socket for /172.21.132.57 because of error (kafka.network.Processor)
      java.io.IOException: Connection reset by peer
      at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
      at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
      at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
      at sun.nio.ch.IOUtil.write(IOUtil.java:65)
      at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
      at kafka.api.TopicDataSend.writeTo(FetchResponse.scala:123)
      at kafka.network.MultiSend.writeTo(Transmission.scala:101)
      at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:231)
      at kafka.network.Processor.write(SocketServer.scala:472)
      at kafka.network.Processor.run(SocketServer.scala:342)
      at java.lang.Thread.run(Thread.java:745)

      Need to investigate this issue further. looks like the CoordinatorStream is conflicting with the StreamAppender components.

      1. SAMZA-723.1.patch
        15 kB
        Yan Fang
      2. SAMZA-723-2.patch
        13 kB
        Navina Ramesh
      3. SAMZA-723-asynAppender.patch
        2 kB
        Yan Fang

        Issue Links

          Activity

          Hide
          navina Navina Ramesh added a comment -

          Committed. Thanks!

          Show
          navina Navina Ramesh added a comment - Committed. Thanks!
          Hide
          nickpan47 Yi Pan (Data Infrastructure) added a comment -

          +1. Thanks!

          Show
          nickpan47 Yi Pan (Data Infrastructure) added a comment - +1. Thanks!
          Hide
          navina Navina Ramesh added a comment -

          Merged Yan's patch
          Here is the RB - https://reviews.apache.org/r/39464/

          Show
          navina Navina Ramesh added a comment - Merged Yan's patch Here is the RB - https://reviews.apache.org/r/39464/
          Hide
          nickpan47 Yi Pan (Data Infrastructure) added a comment - - edited

          Hi, Yan Fang, sorry to reply late on this. I agree that we can have this patch as the temporary fix. Please feel free to check-in.

          Thanks!

          Show
          nickpan47 Yi Pan (Data Infrastructure) added a comment - - edited Hi, Yan Fang , sorry to reply late on this. I agree that we can have this patch as the temporary fix. Please feel free to check-in. Thanks!
          Hide
          closeuris Yan Fang added a comment -

          So what do you think of this fix? I can not understand why this fix works, (also, can not understand why it works in previous version), but if this patch works in everyone's environment, maybe we use this as a temporary fix and revisit it when possible.

          Show
          closeuris Yan Fang added a comment - So what do you think of this fix? I can not understand why this fix works, (also, can not understand why it works in previous version), but if this patch works in everyone's environment, maybe we use this as a temporary fix and revisit it when possible.
          Hide
          closeuris Yan Fang added a comment -

          This thing is that, after setting the recursive flag, it does not happen. So I am suspecting this maybe a race condition, but still have not found it out.

          Show
          closeuris Yan Fang added a comment - This thing is that, after setting the recursive flag, it does not happen. So I am suspecting this maybe a race condition, but still have not found it out.
          Hide
          closeuris Yan Fang added a comment -

          Even with the recursive flag in StreamAppender, how does it avoid the deadlock scenario between the sender thread and the main thread on the RootLogger monitor?

          You are right. That is why the fix does not convince me. (If you try the patch, it does fix the problem though). On the other hand, this block should also have happened in previous version, if this "deadlock" was a problem.

          Another worth mentioning is that, there actually are two producer-network-threads in the stacktrace, only one of them is blocked, the other work works (RUNNABLE status).

          Show
          closeuris Yan Fang added a comment - Even with the recursive flag in StreamAppender, how does it avoid the deadlock scenario between the sender thread and the main thread on the RootLogger monitor? You are right. That is why the fix does not convince me. (If you try the patch, it does fix the problem though). On the other hand, this block should also have happened in previous version, if this "deadlock" was a problem. Another worth mentioning is that, there actually are two producer-network-threads in the stacktrace, only one of them is blocked, the other work works (RUNNABLE status).
          Hide
          nickpan47 Yi Pan (Data Infrastructure) added a comment -

          Hi, Yan Fang, further looking at the original stack trace, I realized that the "deadlock" is not actually forever, since the waitOnMetadata() call is a TIMED_WAIT. However, I still couldn't figure out the second part of the problem. Even with the recursive flag in StreamAppender, how does it avoid the deadlock scenario between the sender thread and the main thread on the RootLogger monitor? The potential problem w/ locking is

          Category.callAppender()
          -> lock on RootLogger
          -> AppenderAttachableImpl.appendLoopOnAppenders()
          -> lock on StreamAppender
          -> AppenderSkeleton.doAppend()
          -> StreamAppender.append()
          -> ...
          ->Metadata.awaitUpdate()

          And the sender thread does not even enter the StreamAppender.append() method while blocking:
          org.apache.kafka.clients.producer.internals.Sender.run()
          -> Category.callAppender()
          -> blocked on lock for RootLogger

          It seems that this would almost always happen when the first time logging via StreamAppender is triggered while KafkaProducer trying to fetch the topic metedata from the broker via a blocking call.

          Show
          nickpan47 Yi Pan (Data Infrastructure) added a comment - Hi, Yan Fang , further looking at the original stack trace, I realized that the "deadlock" is not actually forever, since the waitOnMetadata() call is a TIMED_WAIT. However, I still couldn't figure out the second part of the problem. Even with the recursive flag in StreamAppender, how does it avoid the deadlock scenario between the sender thread and the main thread on the RootLogger monitor? The potential problem w/ locking is Category.callAppender() -> lock on RootLogger -> AppenderAttachableImpl.appendLoopOnAppenders() -> lock on StreamAppender -> AppenderSkeleton.doAppend() -> StreamAppender.append() -> ... ->Metadata.awaitUpdate() And the sender thread does not even enter the StreamAppender.append() method while blocking: org.apache.kafka.clients.producer.internals.Sender.run() -> Category.callAppender() -> blocked on lock for RootLogger It seems that this would almost always happen when the first time logging via StreamAppender is triggered while KafkaProducer trying to fetch the topic metedata from the broker via a blocking call.
          Hide
          closeuris Yan Fang added a comment -

          RB: https://reviews.apache.org/r/37428/

          So there actually are two problems:

          1. infinite loop in queue.poll .

          Reason:
          When we initialize the StreamAppender (at the beginning of the AM main), we create the JobCoordinator, which bootstraps the coordinator stream. However, since the BrokerProxy thread is waiting for the log being initialized, it never reads the messages and so never set the messages to isAtHead. The bootstraps goes into an infinite loop because the coordinator stream never reads to the head.

          Fix:
          Only initialize the StreamAppender after the JobCoordinator is instantiated. This will miss the logs generated during the JobCoordinator constructing process. This is acceptable, because we will miss those logs anyway. (If we want, we can fix this problem by temporally adding those logs in a list, and send those logs after the JobCoordinator is instantiated. We can open a new ticket for this.) This fix also avoids creating two http servers. Because, otherwise, StreamAppender creates one http server (this is the side effect of instantiating the JobCoordinator) and the AM creates another one.

          2. deadlock between producer thread and the main thead.

          I am actually still looking inside the root cause. Because I still can not convince myself why the fix works...

          We were using the localthread variable to avoid recursively sending logs. (Recursiveness is, "append" calls "send", "send" has logs, which call "append" again, "append" calls "send"...) Previously, the producer is in the same thread as the main thread,so for the logs in the "send", we do not "send" them again (recursiveCalled variable is set to true). However, after Kafka 0.8.2, the new producer is in another thread, the localthread variable does not affect the producer thread. So the producer thread's logs call the "append" twice (first time, the producer-thread's localthread variable is false, second time, it becomes true). But not sure why this causes the deadlock and why this does not cause deadlock after the fix...Any thoughts?

          But anyway, feel free to review the RB and test it. It works in my environment. Thank you.

          Show
          closeuris Yan Fang added a comment - RB: https://reviews.apache.org/r/37428/ So there actually are two problems: 1. infinite loop in queue.poll . Reason: When we initialize the StreamAppender (at the beginning of the AM main), we create the JobCoordinator, which bootstraps the coordinator stream. However, since the BrokerProxy thread is waiting for the log being initialized, it never reads the messages and so never set the messages to isAtHead. The bootstraps goes into an infinite loop because the coordinator stream never reads to the head. Fix: Only initialize the StreamAppender after the JobCoordinator is instantiated. This will miss the logs generated during the JobCoordinator constructing process. This is acceptable, because we will miss those logs anyway. (If we want, we can fix this problem by temporally adding those logs in a list, and send those logs after the JobCoordinator is instantiated. We can open a new ticket for this.) This fix also avoids creating two http servers. Because, otherwise, StreamAppender creates one http server (this is the side effect of instantiating the JobCoordinator) and the AM creates another one. 2. deadlock between producer thread and the main thead. I am actually still looking inside the root cause. Because I still can not convince myself why the fix works... We were using the localthread variable to avoid recursively sending logs. (Recursiveness is, "append" calls "send", "send" has logs, which call "append" again, "append" calls "send"...) Previously, the producer is in the same thread as the main thread,so for the logs in the "send", we do not "send" them again (recursiveCalled variable is set to true). However, after Kafka 0.8.2, the new producer is in another thread, the localthread variable does not affect the producer thread. So the producer thread's logs call the "append" twice (first time, the producer-thread's localthread variable is false, second time, it becomes true). But not sure why this causes the deadlock and why this does not cause deadlock after the fix...Any thoughts? But anyway, feel free to review the RB and test it. It works in my environment. Thank you.
          Hide
          closeuris Yan Fang added a comment -

          Quick update: fixed the infinity loop. Now finally I see the deadlock pointed out by Navina Ramesh. Will work on this.

          Show
          closeuris Yan Fang added a comment - Quick update: fixed the infinity loop. Now finally I see the deadlock pointed out by Navina Ramesh . Will work on this.
          Hide
          closeuris Yan Fang added a comment -

          Just one update:

          In my stackstrace, it is not a deadlock. The code stucks in the loop.

          Let me fix this to see if this solve this problem.

          Show
          closeuris Yan Fang added a comment - Just one update: In my stackstrace, it is not a deadlock. The code stucks in the loop . Let me fix this to see if this solve this problem.
          Hide
          closeuris Yan Fang added a comment -

          Hi Navina Ramesh, this stacktrace is what I got from using both asyn and syn appender. Obviously, both are waiting in the coordinatorSystemConsumer.bootstrap part.

          To use AsynAppender, I tried two ways:

          1. add following to the log4.xml:

          <appender name="StreamAppender" class="org.apache.samza.logging.log4j.StreamAppender">
             <layout class="org.apache.log4j.PatternLayout">
               <param name="ConversionPattern" value="%X{containerName} %X{jobName} %X{jobId} %d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n" />
             </layout>
          </appender>
          
           <appender name="async" class="org.apache.log4j.AsyncAppender">
                  <param name="BufferSize" value="500"/>
                  <appender-ref ref="StreamAppender"/>
          </appender>
          
           <root>
              <priority value="debug" />
              <appender-ref ref="async"/>
          </root>
          

          2. use the patch, and use

          <appender name="StreamAppender" class="org.apache.samza.logging.log4j.StreamAppender">
             <layout class="org.apache.log4j.PatternLayout">
               <param name="ConversionPattern" value="%X{containerName} %X{jobName} %X{jobId} %d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n" />
             </layout>
          </appender>
          

          To see if this changes your stacktrace. Thank you.

          Show
          closeuris Yan Fang added a comment - Hi Navina Ramesh , this stacktrace is what I got from using both asyn and syn appender. Obviously, both are waiting in the coordinatorSystemConsumer.bootstrap part. To use AsynAppender, I tried two ways: 1. add following to the log4.xml: <appender name= "StreamAppender" class= "org.apache.samza.logging.log4j.StreamAppender" > <layout class= "org.apache.log4j.PatternLayout" > <param name= "ConversionPattern" value= "%X{containerName} %X{jobName} %X{jobId} %d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n" /> </layout> </appender> <appender name= "async" class= "org.apache.log4j.AsyncAppender" > <param name= "BufferSize" value= "500" /> <appender-ref ref= "StreamAppender" /> </appender> <root> <priority value= "debug" /> <appender-ref ref= "async" /> </root> 2. use the patch, and use <appender name= "StreamAppender" class= "org.apache.samza.logging.log4j.StreamAppender" > <layout class= "org.apache.log4j.PatternLayout" > <param name= "ConversionPattern" value= "%X{containerName} %X{jobName} %X{jobId} %d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n" /> </layout> </appender> To see if this changes your stacktrace. Thank you.
          Hide
          navina Navina Ramesh added a comment -

          Hmm.. Not sure. :/ I don't think separating the class loaded for app and framework will help, will it? (SAMZA-697)

          Can you put up the patch with AsyncAppender? I will also give it a try.

          Show
          navina Navina Ramesh added a comment - Hmm.. Not sure. :/ I don't think separating the class loaded for app and framework will help, will it? ( SAMZA-697 ) Can you put up the patch with AsyncAppender? I will also give it a try.
          Hide
          navina Navina Ramesh added a comment -

          Yan Fang : Is this the stacktrace with the AsyncAppender configured?

          Show
          navina Navina Ramesh added a comment - Yan Fang : Is this the stacktrace with the AsyncAppender configured?
          Hide
          closeuris Yan Fang added a comment -

          Using asyncAppender does not fix my issue. Any thoughts?

          Show
          closeuris Yan Fang added a comment - Using asyncAppender does not fix my issue. Any thoughts?
          Hide
          closeuris Yan Fang added a comment -

          hmm, I looked at the stacktrace, got some different result:

          "main" prio=5 tid=0x00007fcc2a001000 nid=0x1903 waiting on condition [0x0000000105b5e000]
             java.lang.Thread.State: TIMED_WAITING (parking)
          	at sun.misc.Unsafe.park(Native Method)
          	- parking to wait for  <0x00000007f1bfe270> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
          	at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
          	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
          	at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
          	at org.apache.samza.util.BlockingEnvelopeMap.poll(BlockingEnvelopeMap.java:135)
          	at org.apache.samza.system.SystemStreamPartitionIterator.refresh(SystemStreamPartitionIterator.java:80)
          	at org.apache.samza.system.SystemStreamPartitionIterator.hasNext(SystemStreamPartitionIterator.java:57)
          	at org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer.bootstrap(CoordinatorStreamSystemConsumer.java:141)
          	at org.apache.samza.coordinator.JobCoordinator$.apply(JobCoordinator.scala:68)
          	at org.apache.samza.coordinator.JobCoordinator$.apply(JobCoordinator.scala:77)
          	at org.apache.samza.coordinator.JobCoordinator.apply(JobCoordinator.scala)
          	at org.apache.samza.logging.log4j.StreamAppender.getConfig(StreamAppender.java:183)
          	at org.apache.samza.logging.log4j.StreamAppender.activateOptions(StreamAppender.java:94)
          	at org.apache.log4j.config.PropertySetter.activate(PropertySetter.java:307)
          	at org.apache.log4j.xml.DOMConfigurator.parseAppender(DOMConfigurator.java:295)
          	at org.apache.log4j.xml.DOMConfigurator.findAppenderByName(DOMConfigurator.java:176)
          	at org.apache.log4j.xml.DOMConfigurator.findAppenderByReference(DOMConfigurator.java:191)
          	at org.apache.log4j.xml.DOMConfigurator.parseChildrenOfLoggerElement(DOMConfigurator.java:523)
          	at org.apache.log4j.xml.DOMConfigurator.parseRoot(DOMConfigurator.java:492)
          	- locked <0x00000007f4090ec8> (a org.apache.log4j.spi.RootLogger)
          	at org.apache.log4j.xml.DOMConfigurator.parse(DOMConfigurator.java:1001)
          	at org.apache.log4j.xml.DOMConfigurator.doConfigure(DOMConfigurator.java:867)
          	at org.apache.log4j.xml.DOMConfigurator.doConfigure(DOMConfigurator.java:773)
          	at org.apache.log4j.helpers.OptionConverter.selectAndConfigure(OptionConverter.java:483)
          	at org.apache.log4j.LogManager.<clinit>(LogManager.java:127)
          	at org.slf4j.impl.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:73)
          	- locked <0x00000007f40910e8> (a org.slf4j.impl.Log4jLoggerFactory)
          	at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:253)
          	at org.apache.samza.util.Logging$class.logger(Logging.scala:27)
          	at org.apache.samza.job.yarn.SamzaAppMaster$.logger$lzycompute(SamzaAppMaster.scala:56)
          	- locked <0x00000007f4091110> (a org.apache.samza.job.yarn.SamzaAppMaster$)
          	at org.apache.samza.job.yarn.SamzaAppMaster$.logger(SamzaAppMaster.scala:56)
          	at org.apache.samza.util.Logging$class.info(Logging.scala:54)
          	at org.apache.samza.job.yarn.SamzaAppMaster$.info(SamzaAppMaster.scala:56)
          	at org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:64)
          	at org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scala)
          
          "SAMZA-BROKER-PROXY-BrokerProxy thread pointed at 10.151.99.45:9092 for client samza_consumer-simple_task0715-1-1438649236052-1" daemon prio=5 tid=0x00007fcc298ad800 nid=0x6003 in Object.wait() [0x000000010e96a000]
             java.lang.Thread.State: RUNNABLE
          	at org.apache.log4j.LogManager.getLoggerRepository(LogManager.java:196)
          	at org.apache.log4j.LogManager.getLogger(LogManager.java:228)
          	at org.apache.log4j.Logger.getLogger(Logger.java:104)
          	at kafka.utils.Logging$class.logger(Logging.scala:24)
          	at kafka.network.BoundedByteBufferSend.logger$lzycompute(BoundedByteBufferSend.scala:26)
          	- locked <0x00000007f202b150> (a kafka.network.BoundedByteBufferSend)
          	at kafka.network.BoundedByteBufferSend.logger(BoundedByteBufferSend.scala:26)
          	at kafka.utils.Logging$class.trace(Logging.scala:35)
          	at kafka.network.BoundedByteBufferSend.trace(BoundedByteBufferSend.scala:26)
          	at kafka.network.Send$class.writeCompletely(Transmission.scala:76)
          	at kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBufferSend.scala:26)
          	at kafka.network.BlockingChannel.send(BlockingChannel.scala:103)
          	at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:70)
          	at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
          	- locked <0x00000007f1d27240> (a java.lang.Object)
          	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
          	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
          	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
          	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
          	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
          	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
          	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
          	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
          	at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
          	at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.fetch(DefaultFetchSimpleConsumer.scala:48)
          	at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.defaultFetch(DefaultFetchSimpleConsumer.scala:41)
          	at org.apache.samza.system.kafka.BrokerProxy.org$apache$samza$system$kafka$BrokerProxy$$fetchMessages(BrokerProxy.scala:176)
          	at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$run$1.apply(BrokerProxy.scala:146)
          	at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$run$1.apply(BrokerProxy.scala:133)
          	at org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82)
          	at org.apache.samza.system.kafka.BrokerProxy$$anon$1.run(BrokerProxy.scala:132)
          	at java.lang.Thread.run(Thread.java:745)
          
          

          There is not specific lock here... Even can not find the kafka-producer-network-thread thread...

          Show
          closeuris Yan Fang added a comment - hmm, I looked at the stacktrace, got some different result: "main" prio=5 tid=0x00007fcc2a001000 nid=0x1903 waiting on condition [0x0000000105b5e000] java.lang. Thread .State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00000007f1bfe270> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082) at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467) at org.apache.samza.util.BlockingEnvelopeMap.poll(BlockingEnvelopeMap.java:135) at org.apache.samza.system.SystemStreamPartitionIterator.refresh(SystemStreamPartitionIterator.java:80) at org.apache.samza.system.SystemStreamPartitionIterator.hasNext(SystemStreamPartitionIterator.java:57) at org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer.bootstrap(CoordinatorStreamSystemConsumer.java:141) at org.apache.samza.coordinator.JobCoordinator$.apply(JobCoordinator.scala:68) at org.apache.samza.coordinator.JobCoordinator$.apply(JobCoordinator.scala:77) at org.apache.samza.coordinator.JobCoordinator.apply(JobCoordinator.scala) at org.apache.samza.logging.log4j.StreamAppender.getConfig(StreamAppender.java:183) at org.apache.samza.logging.log4j.StreamAppender.activateOptions(StreamAppender.java:94) at org.apache.log4j.config.PropertySetter.activate(PropertySetter.java:307) at org.apache.log4j.xml.DOMConfigurator.parseAppender(DOMConfigurator.java:295) at org.apache.log4j.xml.DOMConfigurator.findAppenderByName(DOMConfigurator.java:176) at org.apache.log4j.xml.DOMConfigurator.findAppenderByReference(DOMConfigurator.java:191) at org.apache.log4j.xml.DOMConfigurator.parseChildrenOfLoggerElement(DOMConfigurator.java:523) at org.apache.log4j.xml.DOMConfigurator.parseRoot(DOMConfigurator.java:492) - locked <0x00000007f4090ec8> (a org.apache.log4j.spi.RootLogger) at org.apache.log4j.xml.DOMConfigurator.parse(DOMConfigurator.java:1001) at org.apache.log4j.xml.DOMConfigurator.doConfigure(DOMConfigurator.java:867) at org.apache.log4j.xml.DOMConfigurator.doConfigure(DOMConfigurator.java:773) at org.apache.log4j.helpers.OptionConverter.selectAndConfigure(OptionConverter.java:483) at org.apache.log4j.LogManager.<clinit>(LogManager.java:127) at org.slf4j.impl.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:73) - locked <0x00000007f40910e8> (a org.slf4j.impl.Log4jLoggerFactory) at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:253) at org.apache.samza.util.Logging$class.logger(Logging.scala:27) at org.apache.samza.job.yarn.SamzaAppMaster$.logger$lzycompute(SamzaAppMaster.scala:56) - locked <0x00000007f4091110> (a org.apache.samza.job.yarn.SamzaAppMaster$) at org.apache.samza.job.yarn.SamzaAppMaster$.logger(SamzaAppMaster.scala:56) at org.apache.samza.util.Logging$class.info(Logging.scala:54) at org.apache.samza.job.yarn.SamzaAppMaster$.info(SamzaAppMaster.scala:56) at org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:64) at org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scala) "SAMZA-BROKER-PROXY-BrokerProxy thread pointed at 10.151.99.45:9092 for client samza_consumer-simple_task0715-1-1438649236052-1" daemon prio=5 tid=0x00007fcc298ad800 nid=0x6003 in Object .wait() [0x000000010e96a000] java.lang. Thread .State: RUNNABLE at org.apache.log4j.LogManager.getLoggerRepository(LogManager.java:196) at org.apache.log4j.LogManager.getLogger(LogManager.java:228) at org.apache.log4j.Logger.getLogger(Logger.java:104) at kafka.utils.Logging$class.logger(Logging.scala:24) at kafka.network.BoundedByteBufferSend.logger$lzycompute(BoundedByteBufferSend.scala:26) - locked <0x00000007f202b150> (a kafka.network.BoundedByteBufferSend) at kafka.network.BoundedByteBufferSend.logger(BoundedByteBufferSend.scala:26) at kafka.utils.Logging$class.trace(Logging.scala:35) at kafka.network.BoundedByteBufferSend.trace(BoundedByteBufferSend.scala:26) at kafka.network.Send$class.writeCompletely(Transmission.scala:76) at kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBufferSend.scala:26) at kafka.network.BlockingChannel.send(BlockingChannel.scala:103) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:70) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68) - locked <0x00000007f1d27240> (a java.lang. Object ) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110) at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.fetch(DefaultFetchSimpleConsumer.scala:48) at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.defaultFetch(DefaultFetchSimpleConsumer.scala:41) at org.apache.samza.system.kafka.BrokerProxy.org$apache$samza$system$kafka$BrokerProxy$$fetchMessages(BrokerProxy.scala:176) at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$run$1.apply(BrokerProxy.scala:146) at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$run$1.apply(BrokerProxy.scala:133) at org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82) at org.apache.samza.system.kafka.BrokerProxy$$anon$1.run(BrokerProxy.scala:132) at java.lang. Thread .run( Thread .java:745) There is not specific lock here... Even can not find the kafka-producer-network-thread thread...
          Hide
          navina Navina Ramesh added a comment -

          Correction: AsyncAppender is wrapper class. I think it can be configured to use any appender that is written in async mode.

          Show
          navina Navina Ramesh added a comment - Correction: AsyncAppender is wrapper class. I think it can be configured to use any appender that is written in async mode.
          Hide
          navina Navina Ramesh added a comment -

          After investigating this issue yesterday, there seems to be a kind of deadlock between the SamzaAppMaster & Kafka producer thread.
          Here is the relevant part of the stacktrace:

          "Attach Listener" #11 daemon prio=9 os_prio=31 tid=0x00007fb9789a2000 nid=0x3d0b waiting on condition [0x0000000000000000]
             java.lang.Thread.State: RUNNABLE
          
          "kafka-producer-network-thread | samza_producer-wikipedia_feed-1-1435630222452-0" #10 daemon prio=5 os_prio=31 tid=0x00007fb979428800 nid=0x5507 waiting for monitor entry [0x000000012467d000]
             java.lang.Thread.State: BLOCKED (on object monitor)
          	at org.apache.log4j.Category.callAppenders(Category.java:205)
          	- waiting to lock <0x00000007b2588fc0> (a org.apache.log4j.spi.RootLogger)
          	at org.apache.log4j.Category.forcedLog(Category.java:391)
          	at org.apache.log4j.Category.log(Category.java:856)
          	at org.slf4j.impl.Log4jLoggerAdapter.debug(Log4jLoggerAdapter.java:209)
          	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:117)
          	at java.lang.Thread.run(Thread.java:745)
          
          "main" #1 prio=5 os_prio=31 tid=0x00007fb979808000 nid=0x1903 in Object.wait() [0x000000010e72a000]
             java.lang.Thread.State: TIMED_WAITING (on object monitor)
          	at java.lang.Object.wait(Native Method)
          	- waiting on <0x00000007b016ea48> (a org.apache.kafka.clients.producer.internals.Metadata)
          	at org.apache.kafka.clients.producer.internals.Metadata.awaitUpdate(Metadata.java:107)
          	- locked <0x00000007b016ea48> (a org.apache.kafka.clients.producer.internals.Metadata)
          	at org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:378)
          	at org.apache.kafka.clients.producer.KafkaProducer.partitionsFor(KafkaProducer.java:405)
          	at org.apache.samza.system.kafka.KafkaSystemProducer.send(KafkaSystemProducer.scala:82)
          	at org.apache.samza.logging.log4j.StreamAppender.append(StreamAppender.java:130)
          	at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
          	- locked <0x00000007b2588f18> (a org.apache.samza.logging.log4j.StreamAppender)
          	at org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
          	at org.apache.log4j.Category.callAppenders(Category.java:206)
          	- locked <0x00000007b2588fc0> (a org.apache.log4j.spi.RootLogger)
          	at org.apache.log4j.Category.forcedLog(Category.java:391)
          	at org.apache.log4j.Category.log(Category.java:856)
          	at org.slf4j.impl.Log4jLoggerAdapter.info(Log4jLoggerAdapter.java:305)
          	at org.apache.samza.util.Logging$class.info(Logging.scala:55)
          	at org.apache.samza.job.yarn.SamzaAppMaster$.info(SamzaAppMaster.scala:56)
          	at org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:64)
          	at org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scala)
          

          The kafka-producer-network-thread is blocked on RootLogger instance (lock 0x00000007b2588fc0) that is held by the main thread (in StreamAppender). The Sender thread tries to grab the lock on the rootLogger instance when it is instantiated for the first time in Line 117.

          One possible solution is to implement the AsyncAppender log4j interface. If we use asyncAppender, then the append() method will immediately return after adding messages to the queue. Not sure what kind of guarantee it will offer to logging data itself. We should investigate more.

          Show
          navina Navina Ramesh added a comment - After investigating this issue yesterday, there seems to be a kind of deadlock between the SamzaAppMaster & Kafka producer thread. Here is the relevant part of the stacktrace: "Attach Listener" #11 daemon prio=9 os_prio=31 tid=0x00007fb9789a2000 nid=0x3d0b waiting on condition [0x0000000000000000] java.lang. Thread .State: RUNNABLE "kafka-producer-network-thread | samza_producer-wikipedia_feed-1-1435630222452-0" #10 daemon prio=5 os_prio=31 tid=0x00007fb979428800 nid=0x5507 waiting for monitor entry [0x000000012467d000] java.lang. Thread .State: BLOCKED (on object monitor) at org.apache.log4j.Category.callAppenders(Category.java:205) - waiting to lock <0x00000007b2588fc0> (a org.apache.log4j.spi.RootLogger) at org.apache.log4j.Category.forcedLog(Category.java:391) at org.apache.log4j.Category.log(Category.java:856) at org.slf4j.impl.Log4jLoggerAdapter.debug(Log4jLoggerAdapter.java:209) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:117) at java.lang. Thread .run( Thread .java:745) "main" #1 prio=5 os_prio=31 tid=0x00007fb979808000 nid=0x1903 in Object .wait() [0x000000010e72a000] java.lang. Thread .State: TIMED_WAITING (on object monitor) at java.lang. Object .wait(Native Method) - waiting on <0x00000007b016ea48> (a org.apache.kafka.clients.producer.internals.Metadata) at org.apache.kafka.clients.producer.internals.Metadata.awaitUpdate(Metadata.java:107) - locked <0x00000007b016ea48> (a org.apache.kafka.clients.producer.internals.Metadata) at org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:378) at org.apache.kafka.clients.producer.KafkaProducer.partitionsFor(KafkaProducer.java:405) at org.apache.samza.system.kafka.KafkaSystemProducer.send(KafkaSystemProducer.scala:82) at org.apache.samza.logging.log4j.StreamAppender.append(StreamAppender.java:130) at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251) - locked <0x00000007b2588f18> (a org.apache.samza.logging.log4j.StreamAppender) at org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66) at org.apache.log4j.Category.callAppenders(Category.java:206) - locked <0x00000007b2588fc0> (a org.apache.log4j.spi.RootLogger) at org.apache.log4j.Category.forcedLog(Category.java:391) at org.apache.log4j.Category.log(Category.java:856) at org.slf4j.impl.Log4jLoggerAdapter.info(Log4jLoggerAdapter.java:305) at org.apache.samza.util.Logging$class.info(Logging.scala:55) at org.apache.samza.job.yarn.SamzaAppMaster$.info(SamzaAppMaster.scala:56) at org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:64) at org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scala) The kafka-producer-network-thread is blocked on RootLogger instance (lock 0x00000007b2588fc0) that is held by the main thread (in StreamAppender). The Sender thread tries to grab the lock on the rootLogger instance when it is instantiated for the first time in Line 117. One possible solution is to implement the AsyncAppender log4j interface. If we use asyncAppender, then the append() method will immediately return after adding messages to the queue. Not sure what kind of guarantee it will offer to logging data itself. We should investigate more.
          Hide
          nickpan47 Yi Pan (Data Infrastructure) added a comment -

          Just FYI, our internal test w/ 0.9.1 RC1 job works fine with StreamAppender configured in log4j.xml

          Show
          nickpan47 Yi Pan (Data Infrastructure) added a comment - Just FYI, our internal test w/ 0.9.1 RC1 job works fine with StreamAppender configured in log4j.xml
          Hide
          closeuris Yan Fang added a comment -

          Navina Ramesh, yes, I think it runs in the same thread as the container because I did not create any new thread for the appender's producer. All the system configuring process happens in activateOptions() method. If log4j itself does not create new thread for activateOptions (I am not sure about this. Cannot find answer by googling.), then it should happen in the same thread as the container.

          Could it be due to the StreamAppender querying the JobCoordinator while it is still bootstrapping?

          This sounds possible. Especially the JobCoordinator itself has the logging information, which in term calls the log4jAppender, sounds like a dead-lock here...

          Show
          closeuris Yan Fang added a comment - Navina Ramesh , yes, I think it runs in the same thread as the container because I did not create any new thread for the appender's producer. All the system configuring process happens in activateOptions() method. If log4j itself does not create new thread for activateOptions (I am not sure about this. Cannot find answer by googling.), then it should happen in the same thread as the container. Could it be due to the StreamAppender querying the JobCoordinator while it is still bootstrapping? This sounds possible. Especially the JobCoordinator itself has the logging information, which in term calls the log4jAppender, sounds like a dead-lock here...
          Hide
          navina Navina Ramesh added a comment -

          Yan Fang Does the log4JAppender run in the same thread as the container or or is it a concurrent thread? I am trying to troubleshoot why bootstrapping hangs. Could it be due to the StreamAppender querying the JobCoordinator while it is still bootstrapping?

          Show
          navina Navina Ramesh added a comment - Yan Fang Does the log4JAppender run in the same thread as the container or or is it a concurrent thread? I am trying to troubleshoot why bootstrapping hangs. Could it be due to the StreamAppender querying the JobCoordinator while it is still bootstrapping?

            People

            • Assignee:
              closeuris Yan Fang
              Reporter:
              navina Navina Ramesh
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development