Uploaded image for project: 'Samza'
  1. Samza
  2. SAMZA-1537

StreamAppender can deadlock due to locks held by Kafka and Log4j

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.14.0
    • Component/s: None
    • Labels:
      None

      Description

      The thread dumps of the 2 offending threads are below, but the basics are:
      1. AppInfoParser in kafka uses static synchronized methods
      2. Log4j synchronizes per Category

      So if the StreamAppender tries create a new KafkaProducer, which calls the static sync AppInfoParser thread, which then tries to log to the same Category

      "kafka-producer-network-thread | kafka_producer-samza_xxx_yyy-i001" daemon prio=5 tid=23 BLOCKED
      	at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:57)
      	   Local Variable: java.lang.String#326563
      	   Local Variable: java.lang.String#329864
      	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:336)
      	   Local Variable: org.apache.kafka.common.config.AbstractConfig$RecordingMap#12
      	   Local Variable: java.util.ArrayList#265184
      	   Local Variable: org.apache.kafka.common.metrics.MetricConfig#9
      	   Local Variable: java.util.LinkedHashMap#991
      	   Local Variable: org.apache.kafka.common.internals.ClusterResourceListeners#9
      	   Local Variable: java.util.ArrayList#265353
      	   Local Variable: org.apache.kafka.clients.NetworkClient#9
      	   Local Variable: org.apache.kafka.common.network.SslChannelBuilder#9
      	   Local Variable: java.util.ArrayList#265374
      	   Local Variable: org.apache.kafka.clients.producer.ProducerConfig#3
      	   Local Variable: java.lang.String#309971
      	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:182)
      	   Local Variable: org.apache.kafka.common.config.AbstractConfig$RecordingMap#11
      	   Local Variable: org.apache.kafka.clients.producer.KafkaProducer#3
      	   Local Variable: org.apache.kafka.common.serialization.ByteArraySerializer#7
      	   Local Variable: org.apache.kafka.common.serialization.ByteArraySerializer#8
      	at com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.<init>(LiKafkaProducerImpl.java:159)
      	   Local Variable: com.linkedin.kafka.liclients.producer.LiKafkaProducerConfig#2
      	at com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.<init>(LiKafkaProducerImpl.java:137)
      	   Local Variable: org.apache.kafka.common.serialization.ByteArraySerializer#10
      	   Local Variable: org.apache.kafka.common.serialization.ByteArraySerializer#9
      	   Local Variable: java.util.Properties#38
      	   Local Variable: com.linkedin.samza.system.kafka.serializers.NoOpSegmentSerializer#3
      	   Local Variable: com.linkedin.kafka.liclients.auditing.NoOpAuditor#9
      	   Local Variable: com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl#3
      	at com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.createLiKafkaProducer(SamzaRawLiKafkaSystemProducer.java:84)
      	at com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.handleSendException(SamzaRawLiKafkaSystemProducer.java:224)
      	at com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.send(SamzaRawLiKafkaSystemProducer.java:167)
      	   Local Variable: java.lang.String#326561
      	   Local Variable: java.lang.IllegalStateException#2
      	   Local Variable: java.lang.String#330077
      	   Local Variable: org.apache.samza.system.SystemProducerException#4
      	   Local Variable: java.lang.Integer#15116
      	at org.apache.samza.logging.log4j.StreamAppender.append(StreamAppender.java:115)
      	at com.linkedin.atc.log4j.SafeStreamAppender.streamAppend(SafeStreamAppender.java:32)
      	at com.linkedin.atc.log4j.SafeStreamAppender.append(SafeStreamAppender.java:23)
      	   Local Variable: com.linkedin.atc.log4j.SafeStreamAppender#1
      	at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
      	at org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
      	   Local Variable: org.apache.log4j.helpers.AppenderAttachableImpl#1
      	at org.apache.log4j.Category.callAppenders(Category.java:206)
      	   Local Variable: org.apache.log4j.spi.LoggingEvent#24
      	   Local Variable: org.apache.log4j.Logger#4
      	at org.apache.log4j.Category.forcedLog(Category.java:391)
      	at org.apache.log4j.Category.log(Category.java:856)
      	at org.slf4j.impl.Log4jLoggerAdapter.info(Log4jLoggerAdapter.java:323)
      	at com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.close(LiKafkaProducerImpl.java:313)
      	   Local Variable: java.util.concurrent.TimeUnit$3#1
      	at com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.handleSendException(SamzaRawLiKafkaSystemProducer.java:220)
      	   Local Variable: com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl#4
      	   Local Variable: com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer#2
      	   Local Variable: java.lang.Boolean#1
      	   Local Variable: org.apache.samza.system.SystemProducerException#2
      	   Local Variable: java.lang.Object#203455
      	at com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.lambda$send$17(SamzaRawLiKafkaSystemProducer.java:157)
      	at com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer$$Lambda$12.onCompletion(<unknown string>)
      	at com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl$ErrorLoggingCallback.onCompletion(LiKafkaProducerImpl.java:362)
      	   Local Variable: com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl$ErrorLoggingCallback#58
      	at org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:162)
      	   Local Variable: org.apache.kafka.clients.producer.internals.RecordBatch$Thunk#58
      	   Local Variable: org.apache.kafka.common.errors.TimeoutException#2
      	   Local Variable: java.util.ArrayList$Itr#6
      	at org.apache.kafka.clients.producer.internals.RecordBatch.expirationDone(RecordBatch.java:282)
      	at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortExpiredBatches(RecordAccumulator.java:277)
      	   Local Variable: java.util.ArrayList#263984
      	   Local Variable: org.apache.kafka.clients.producer.internals.RecordAccumulator#4
      	   Local Variable: java.util.ArrayList$Itr#5
      	   Local Variable: org.apache.kafka.clients.producer.internals.RecordBatch#34
      	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:210)
      	   Local Variable: org.apache.kafka.common.Cluster#2
      	   Local Variable: java.util.Collections$EmptyMap#1
      	   Local Variable: org.apache.kafka.clients.producer.internals.RecordAccumulator$ReadyCheckResult#2
      	   Local Variable: java.util.HashMap$KeyIterator#2
      	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:131)
      	   Local Variable: org.apache.kafka.clients.producer.internals.Sender#4
      	at java.lang.Thread.run(Thread.java:745)
      
      
      "kafka-producer-network-thread | kafka_producer-samza_xxx_yyy-i001" daemon prio=5 tid=35 BLOCKED
      	at org.apache.log4j.Category.callAppenders(Category.java:204)
      	   Local Variable: org.apache.log4j.spi.LoggingEvent#26
      	   Local Variable: org.apache.log4j.Logger#15
      	at org.apache.log4j.Category.forcedLog(Category.java:391)
      	at org.apache.log4j.Category.log(Category.java:856)
      	at org.slf4j.impl.Log4jLoggerAdapter.info(Log4jLoggerAdapter.java:304)
      	at org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:87)
      	at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:59)
      	   Local Variable: javax.management.ObjectName#162
      	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:336)
      	   Local Variable: org.apache.kafka.common.network.SslChannelBuilder#7
      	   Local Variable: java.util.ArrayList#264895
      	   Local Variable: org.apache.kafka.common.internals.ClusterResourceListeners#7
      	   Local Variable: java.lang.String#308990
      	   Local Variable: org.apache.kafka.common.config.AbstractConfig$RecordingMap#8
      	   Local Variable: java.util.LinkedHashMap#854
      	   Local Variable: java.util.ArrayList#264889
      	   Local Variable: org.apache.kafka.common.metrics.MetricConfig#7
      	   Local Variable: java.util.ArrayList#264910
      	   Local Variable: org.apache.kafka.clients.producer.ProducerConfig#2
      	   Local Variable: org.apache.kafka.clients.NetworkClient#7
      	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:182)
      	   Local Variable: org.apache.kafka.common.serialization.ByteArraySerializer#5
      	   Local Variable: org.apache.kafka.common.serialization.ByteArraySerializer#6
      	   Local Variable: org.apache.kafka.common.config.AbstractConfig$RecordingMap#6
      	   Local Variable: org.apache.kafka.clients.producer.KafkaProducer#2
      	at com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.<init>(LiKafkaProducerImpl.java:159)
      	   Local Variable: com.linkedin.kafka.liclients.producer.LiKafkaProducerConfig#1
      	at com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.<init>(LiKafkaProducerImpl.java:137)
      	   Local Variable: java.util.Properties#67
      	   Local Variable: com.linkedin.kafka.liclients.auditing.NoOpAuditor#8
      	   Local Variable: com.linkedin.samza.system.kafka.serializers.NoOpSegmentSerializer#2
      	   Local Variable: com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl#2
      	   Local Variable: org.apache.kafka.common.serialization.ByteArraySerializer#14
      	   Local Variable: org.apache.kafka.common.serialization.ByteArraySerializer#13
      	at com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.createLiKafkaProducer(SamzaRawLiKafkaSystemProducer.java:84)
      	at com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.handleSendException(SamzaRawLiKafkaSystemProducer.java:224)
      	   Local Variable: org.apache.samza.system.SystemProducerException#1
      	   Local Variable: com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl#5
      	   Local Variable: com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer#3
      	at com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.lambda$send$17(SamzaRawLiKafkaSystemProducer.java:157)
      	at com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer$$Lambda$12.onCompletion(<unknown string>)
      	at com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl$ErrorLoggingCallback.onCompletion(LiKafkaProducerImpl.java:362)
      	   Local Variable: com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl$ErrorLoggingCallback#27
      	at org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:162)
      	   Local Variable: org.apache.kafka.clients.producer.internals.RecordBatch$Thunk#27
      	   Local Variable: java.util.ArrayList$Itr#2
      	   Local Variable: org.apache.kafka.common.errors.TimeoutException#1
      	at org.apache.kafka.clients.producer.internals.RecordBatch.expirationDone(RecordBatch.java:282)
      	at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortExpiredBatches(RecordAccumulator.java:277)
      	   Local Variable: org.apache.kafka.clients.producer.internals.RecordAccumulator#10
      	   Local Variable: java.util.ArrayList$Itr#1
      	   Local Variable: java.util.ArrayList#263305
      	   Local Variable: org.apache.kafka.clients.producer.internals.RecordBatch#21
      	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:210)
      	   Local Variable: org.apache.kafka.common.Cluster#1
      	   Local Variable: org.apache.kafka.clients.producer.internals.RecordAccumulator$ReadyCheckResult#1
      	   Local Variable: java.util.HashMap$KeyIterator#1
      	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:131)
      	   Local Variable: org.apache.kafka.clients.producer.internals.Sender#7
      	at java.lang.Thread.run(Thread.java:745)
      

      After some discussion with Prateek Maheshwari, we felt making the StreamAppender async was the only reliable solution. There are 2 approaches to this:
      1. Use log4j2 which has async logging by default. The down side is that we'd have to update the StreamAppender and JmxAppender to be log4j2 plugins instead of simply extending AppenderSkeleton.
      2. Add a queue and thread to StreamAppender s.t. new events are added to the queue with some timeout and the thread consumes from the queue and sends to the configured SystemProducer.

      2 is favorable right now because it's quicker to implement and test. It can also be easily replaced by option 1 which is already a goal because we want to leverage the performance benefits of log4j2.

        Attachments

          Activity

            People

            • Assignee:
              jmakes Jake Maes
              Reporter:
              jmakes Jake Maes

              Dates

              • Created:
                Updated:
                Resolved:

                Issue deployment