Uploaded image for project: 'Samza'
  1. Samza
  2. SAMZA-1537

StreamAppender can deadlock due to locks held by Kafka and Log4j

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • 0.14.0
    • None
    • None

    Description

      The thread dumps of the 2 offending threads are below, but the basics are:
      1. AppInfoParser in kafka uses static synchronized methods
      2. Log4j synchronizes per Category

      So if the StreamAppender tries create a new KafkaProducer, which calls the static sync AppInfoParser thread, which then tries to log to the same Category

      "kafka-producer-network-thread | kafka_producer-samza_xxx_yyy-i001" daemon prio=5 tid=23 BLOCKED
      	at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:57)
      	   Local Variable: java.lang.String#326563
      	   Local Variable: java.lang.String#329864
      	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:336)
      	   Local Variable: org.apache.kafka.common.config.AbstractConfig$RecordingMap#12
      	   Local Variable: java.util.ArrayList#265184
      	   Local Variable: org.apache.kafka.common.metrics.MetricConfig#9
      	   Local Variable: java.util.LinkedHashMap#991
      	   Local Variable: org.apache.kafka.common.internals.ClusterResourceListeners#9
      	   Local Variable: java.util.ArrayList#265353
      	   Local Variable: org.apache.kafka.clients.NetworkClient#9
      	   Local Variable: org.apache.kafka.common.network.SslChannelBuilder#9
      	   Local Variable: java.util.ArrayList#265374
      	   Local Variable: org.apache.kafka.clients.producer.ProducerConfig#3
      	   Local Variable: java.lang.String#309971
      	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:182)
      	   Local Variable: org.apache.kafka.common.config.AbstractConfig$RecordingMap#11
      	   Local Variable: org.apache.kafka.clients.producer.KafkaProducer#3
      	   Local Variable: org.apache.kafka.common.serialization.ByteArraySerializer#7
      	   Local Variable: org.apache.kafka.common.serialization.ByteArraySerializer#8
      	at com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.<init>(LiKafkaProducerImpl.java:159)
      	   Local Variable: com.linkedin.kafka.liclients.producer.LiKafkaProducerConfig#2
      	at com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.<init>(LiKafkaProducerImpl.java:137)
      	   Local Variable: org.apache.kafka.common.serialization.ByteArraySerializer#10
      	   Local Variable: org.apache.kafka.common.serialization.ByteArraySerializer#9
      	   Local Variable: java.util.Properties#38
      	   Local Variable: com.linkedin.samza.system.kafka.serializers.NoOpSegmentSerializer#3
      	   Local Variable: com.linkedin.kafka.liclients.auditing.NoOpAuditor#9
      	   Local Variable: com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl#3
      	at com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.createLiKafkaProducer(SamzaRawLiKafkaSystemProducer.java:84)
      	at com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.handleSendException(SamzaRawLiKafkaSystemProducer.java:224)
      	at com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.send(SamzaRawLiKafkaSystemProducer.java:167)
      	   Local Variable: java.lang.String#326561
      	   Local Variable: java.lang.IllegalStateException#2
      	   Local Variable: java.lang.String#330077
      	   Local Variable: org.apache.samza.system.SystemProducerException#4
      	   Local Variable: java.lang.Integer#15116
      	at org.apache.samza.logging.log4j.StreamAppender.append(StreamAppender.java:115)
      	at com.linkedin.atc.log4j.SafeStreamAppender.streamAppend(SafeStreamAppender.java:32)
      	at com.linkedin.atc.log4j.SafeStreamAppender.append(SafeStreamAppender.java:23)
      	   Local Variable: com.linkedin.atc.log4j.SafeStreamAppender#1
      	at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
      	at org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
      	   Local Variable: org.apache.log4j.helpers.AppenderAttachableImpl#1
      	at org.apache.log4j.Category.callAppenders(Category.java:206)
      	   Local Variable: org.apache.log4j.spi.LoggingEvent#24
      	   Local Variable: org.apache.log4j.Logger#4
      	at org.apache.log4j.Category.forcedLog(Category.java:391)
      	at org.apache.log4j.Category.log(Category.java:856)
      	at org.slf4j.impl.Log4jLoggerAdapter.info(Log4jLoggerAdapter.java:323)
      	at com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.close(LiKafkaProducerImpl.java:313)
      	   Local Variable: java.util.concurrent.TimeUnit$3#1
      	at com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.handleSendException(SamzaRawLiKafkaSystemProducer.java:220)
      	   Local Variable: com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl#4
      	   Local Variable: com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer#2
      	   Local Variable: java.lang.Boolean#1
      	   Local Variable: org.apache.samza.system.SystemProducerException#2
      	   Local Variable: java.lang.Object#203455
      	at com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.lambda$send$17(SamzaRawLiKafkaSystemProducer.java:157)
      	at com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer$$Lambda$12.onCompletion(<unknown string>)
      	at com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl$ErrorLoggingCallback.onCompletion(LiKafkaProducerImpl.java:362)
      	   Local Variable: com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl$ErrorLoggingCallback#58
      	at org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:162)
      	   Local Variable: org.apache.kafka.clients.producer.internals.RecordBatch$Thunk#58
      	   Local Variable: org.apache.kafka.common.errors.TimeoutException#2
      	   Local Variable: java.util.ArrayList$Itr#6
      	at org.apache.kafka.clients.producer.internals.RecordBatch.expirationDone(RecordBatch.java:282)
      	at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortExpiredBatches(RecordAccumulator.java:277)
      	   Local Variable: java.util.ArrayList#263984
      	   Local Variable: org.apache.kafka.clients.producer.internals.RecordAccumulator#4
      	   Local Variable: java.util.ArrayList$Itr#5
      	   Local Variable: org.apache.kafka.clients.producer.internals.RecordBatch#34
      	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:210)
      	   Local Variable: org.apache.kafka.common.Cluster#2
      	   Local Variable: java.util.Collections$EmptyMap#1
      	   Local Variable: org.apache.kafka.clients.producer.internals.RecordAccumulator$ReadyCheckResult#2
      	   Local Variable: java.util.HashMap$KeyIterator#2
      	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:131)
      	   Local Variable: org.apache.kafka.clients.producer.internals.Sender#4
      	at java.lang.Thread.run(Thread.java:745)
      
      
      "kafka-producer-network-thread | kafka_producer-samza_xxx_yyy-i001" daemon prio=5 tid=35 BLOCKED
      	at org.apache.log4j.Category.callAppenders(Category.java:204)
      	   Local Variable: org.apache.log4j.spi.LoggingEvent#26
      	   Local Variable: org.apache.log4j.Logger#15
      	at org.apache.log4j.Category.forcedLog(Category.java:391)
      	at org.apache.log4j.Category.log(Category.java:856)
      	at org.slf4j.impl.Log4jLoggerAdapter.info(Log4jLoggerAdapter.java:304)
      	at org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:87)
      	at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:59)
      	   Local Variable: javax.management.ObjectName#162
      	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:336)
      	   Local Variable: org.apache.kafka.common.network.SslChannelBuilder#7
      	   Local Variable: java.util.ArrayList#264895
      	   Local Variable: org.apache.kafka.common.internals.ClusterResourceListeners#7
      	   Local Variable: java.lang.String#308990
      	   Local Variable: org.apache.kafka.common.config.AbstractConfig$RecordingMap#8
      	   Local Variable: java.util.LinkedHashMap#854
      	   Local Variable: java.util.ArrayList#264889
      	   Local Variable: org.apache.kafka.common.metrics.MetricConfig#7
      	   Local Variable: java.util.ArrayList#264910
      	   Local Variable: org.apache.kafka.clients.producer.ProducerConfig#2
      	   Local Variable: org.apache.kafka.clients.NetworkClient#7
      	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:182)
      	   Local Variable: org.apache.kafka.common.serialization.ByteArraySerializer#5
      	   Local Variable: org.apache.kafka.common.serialization.ByteArraySerializer#6
      	   Local Variable: org.apache.kafka.common.config.AbstractConfig$RecordingMap#6
      	   Local Variable: org.apache.kafka.clients.producer.KafkaProducer#2
      	at com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.<init>(LiKafkaProducerImpl.java:159)
      	   Local Variable: com.linkedin.kafka.liclients.producer.LiKafkaProducerConfig#1
      	at com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.<init>(LiKafkaProducerImpl.java:137)
      	   Local Variable: java.util.Properties#67
      	   Local Variable: com.linkedin.kafka.liclients.auditing.NoOpAuditor#8
      	   Local Variable: com.linkedin.samza.system.kafka.serializers.NoOpSegmentSerializer#2
      	   Local Variable: com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl#2
      	   Local Variable: org.apache.kafka.common.serialization.ByteArraySerializer#14
      	   Local Variable: org.apache.kafka.common.serialization.ByteArraySerializer#13
      	at com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.createLiKafkaProducer(SamzaRawLiKafkaSystemProducer.java:84)
      	at com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.handleSendException(SamzaRawLiKafkaSystemProducer.java:224)
      	   Local Variable: org.apache.samza.system.SystemProducerException#1
      	   Local Variable: com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl#5
      	   Local Variable: com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer#3
      	at com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.lambda$send$17(SamzaRawLiKafkaSystemProducer.java:157)
      	at com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer$$Lambda$12.onCompletion(<unknown string>)
      	at com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl$ErrorLoggingCallback.onCompletion(LiKafkaProducerImpl.java:362)
      	   Local Variable: com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl$ErrorLoggingCallback#27
      	at org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:162)
      	   Local Variable: org.apache.kafka.clients.producer.internals.RecordBatch$Thunk#27
      	   Local Variable: java.util.ArrayList$Itr#2
      	   Local Variable: org.apache.kafka.common.errors.TimeoutException#1
      	at org.apache.kafka.clients.producer.internals.RecordBatch.expirationDone(RecordBatch.java:282)
      	at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortExpiredBatches(RecordAccumulator.java:277)
      	   Local Variable: org.apache.kafka.clients.producer.internals.RecordAccumulator#10
      	   Local Variable: java.util.ArrayList$Itr#1
      	   Local Variable: java.util.ArrayList#263305
      	   Local Variable: org.apache.kafka.clients.producer.internals.RecordBatch#21
      	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:210)
      	   Local Variable: org.apache.kafka.common.Cluster#1
      	   Local Variable: org.apache.kafka.clients.producer.internals.RecordAccumulator$ReadyCheckResult#1
      	   Local Variable: java.util.HashMap$KeyIterator#1
      	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:131)
      	   Local Variable: org.apache.kafka.clients.producer.internals.Sender#7
      	at java.lang.Thread.run(Thread.java:745)
      

      After some discussion with pmaheshwari, we felt making the StreamAppender async was the only reliable solution. There are 2 approaches to this:
      1. Use log4j2 which has async logging by default. The down side is that we'd have to update the StreamAppender and JmxAppender to be log4j2 plugins instead of simply extending AppenderSkeleton.
      2. Add a queue and thread to StreamAppender s.t. new events are added to the queue with some timeout and the thread consumes from the queue and sends to the configured SystemProducer.

      2 is favorable right now because it's quicker to implement and test. It can also be easily replaced by option 1 which is already a goal because we want to leverage the performance benefits of log4j2.

      Attachments

        Issue Links

          Activity

            People

              jmakes Jake Maes
              jmakes Jake Maes
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: