Kafka
  1. Kafka
  2. KAFKA-244

Improve log4j appender to use kafka.producer.Producer, and support zk.connect|broker.list options

    Details

    • Type: Improvement Improvement
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.7.1
    • Fix Version/s: 0.7.1
    • Component/s: clients
    • Labels:

      Description

      Taken from #kafka IRC session with Neha Narkhede:
      The log4j appender is quite obsolete, there are a few things to change there. Make it use the kafka.producer.Producer instead of SyncProducer. That allows you to use either the broker.list or the zk.connect option

      1. Log4jAppender.patch
        9 kB
        Stefano Santoro
      2. Log4jAppenderWithWorkingZkConnect.patch
        13 kB
        Stefano Santoro
      3. kafka-244-v3.patch
        14 kB
        Neha Narkhede

        Activity

        Hide
        nicu marasoiu added a comment -

        it looks similar to the initial test but in debug i found that this one configures log4j for the first time when explictly called in test, while the other was already initialized at that point (was a second call to log4j PropertyConfigurer.doConfigure).
        Other info: my tests use TestNG attributes. I mainly reproduced it with rootLogger using Kafka appender, but also when it did not.

        Show
        nicu marasoiu added a comment - it looks similar to the initial test but in debug i found that this one configures log4j for the first time when explictly called in test, while the other was already initialized at that point (was a second call to log4j PropertyConfigurer.doConfigure). Other info: my tests use TestNG attributes. I mainly reproduced it with rootLogger using Kafka appender, but also when it did not.
        Hide
        nicu marasoiu added a comment -

        Hello,

        KafkaLog4jAppender 0.7.1 as commited here has a critical bug caused by a Log4j deadlock, which is not captured in the embedded servers "integration" test, let me explain why.

        If Log4j initializes for the first time from the log4j.properties containing kafka appenders, then, when activateOptions() is called during Log4j boot, zookeeper client is being initialized and its send thread and event thread started. The send thread issues a LOG.info. But since Log4j is just starting up, that LOG is not yet created, or bound to the right parent, say logger.org.apache.zookeeper.client, so it is trying a synchronized(rootLogger). But that rootLogger is just under creation or at least locked by the other thread, ramping up Log4j infrastructure. So its a sheer Log4j deadlock.

        Why the tests don't fail its because Log4j is already initialized at that time, and only reconfigured. So until reconfiguration is complete, it keeps using old loggers.

        So if you put KAFKA on rootLogger, or even elsewhere, the KafkaAppender which connects to ZK during activateOptions causes Log4j deadlock if Log4j is not already initialized. We have written a wrapper that postpone activateOptions after Log4j bootstrap, and it is called in a ServletContextListner.start().

        here is my test class reproducing the issue:

        class DeferredActivationLog4jAppenderIntegrationTest extends Logging {

        val ZKHOST: String = "localhost"
        val zookeeperConnect: String = ZKHOST + ":8083"
        var logDirZk: File = null
        var kafServer: KafkaServer = null

        var zkConsumer: SimpleConsumer = null

        val tLogger = Logger.getLogger(getClass())

        private val brokerZk = 0

        private val ports = choosePorts(1)
        private val portZk = ports(0)

        private var zkServer: EmbeddedZookeeper = null

        @BeforeClass
        def before()

        { before(true) }

        protected def before(retry: Boolean)

        { System.setProperty(Constants.ZOOKEEPER_RETRY, retry.toString) System.setProperty(Constants.ZOOKEEPER_HOSTS, zookeeperConnect) zkServer = new EmbeddedZookeeper(zookeeperConnect) val propsZk = createBrokerConfig(brokerZk, portZk) val logDirZkPath = propsZk.getProperty("log.dir") logDirZk = new File(logDirZkPath) kafServer = createServer(new KafkaConfig(propsZk)) Thread.sleep(100) zkConsumer = new SimpleConsumer(ZKHOST, portZk, 1000000, 64 * 1024) }

        @AfterClass
        def after()

        { zkConsumer.close kafServer.shutdown Utils.rm(logDirZk) Thread.sleep(200) zkServer.shutdown Thread.sleep(200) }

        @Test
        def testZkConnectLog4jAppends() {
        setupLog4j

        for (i <- 1 to 5)
        info("test")

        Thread.sleep(500)

        var offset = 0L
        val messages = zkConsumer.fetch(new FetchRequest("test-topic", 0, offset, 1024 * 1024))

        var count = 0
        for (message <- messages)

        { count = count + 1 }

        assertEquals(5, count)
        }

        def setupLog4j

        { PropertyConfigurator.configure(getLog4jConfigWithZkConnect) }

        private def getLog4jConfigWithZkConnect: Properties =

        { var props = new Properties() props.put("log4j.debug", "true") props.put("log4j.appender.STDOUT", "org.apache.log4j.ConsoleAppender") props.put("log4j.appender.STDOUT.Target", "System.out") props.put("log4j.appender.STDOUT.layout", "org.apache.log4j.PatternLayout") props.put("log4j.appender.STDOUT.layout.ConversionPattern", "%-5p: %c - %m%n") // props.put("log4j.appender.KAFKA", "com.adobe.pass.commons.kafka.producer.DeferredActivationKafkaLog4jAppender") props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender") props.put("log4j.appender.KAFKA.Topic", "test-topic") props.put("log4j.appender.KAFKA.ZkConnect", zookeeperConnect) props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout") props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n") props.put("log4j.logger.org", "TRACE, KAFKA") props.put("log4j.logger.com", "TRACE, KAFKA") props.put("log4j.rootLogger", "INFO") props }
        Show
        nicu marasoiu added a comment - Hello, KafkaLog4jAppender 0.7.1 as commited here has a critical bug caused by a Log4j deadlock, which is not captured in the embedded servers "integration" test, let me explain why. If Log4j initializes for the first time from the log4j.properties containing kafka appenders, then, when activateOptions() is called during Log4j boot, zookeeper client is being initialized and its send thread and event thread started. The send thread issues a LOG.info. But since Log4j is just starting up, that LOG is not yet created, or bound to the right parent, say logger.org.apache.zookeeper.client, so it is trying a synchronized(rootLogger). But that rootLogger is just under creation or at least locked by the other thread, ramping up Log4j infrastructure. So its a sheer Log4j deadlock. Why the tests don't fail its because Log4j is already initialized at that time, and only reconfigured. So until reconfiguration is complete, it keeps using old loggers. So if you put KAFKA on rootLogger, or even elsewhere, the KafkaAppender which connects to ZK during activateOptions causes Log4j deadlock if Log4j is not already initialized. We have written a wrapper that postpone activateOptions after Log4j bootstrap, and it is called in a ServletContextListner.start(). here is my test class reproducing the issue: class DeferredActivationLog4jAppenderIntegrationTest extends Logging { val ZKHOST: String = "localhost" val zookeeperConnect: String = ZKHOST + ":8083" var logDirZk: File = null var kafServer: KafkaServer = null var zkConsumer: SimpleConsumer = null val tLogger = Logger.getLogger(getClass()) private val brokerZk = 0 private val ports = choosePorts(1) private val portZk = ports(0) private var zkServer: EmbeddedZookeeper = null @BeforeClass def before() { before(true) } protected def before(retry: Boolean) { System.setProperty(Constants.ZOOKEEPER_RETRY, retry.toString) System.setProperty(Constants.ZOOKEEPER_HOSTS, zookeeperConnect) zkServer = new EmbeddedZookeeper(zookeeperConnect) val propsZk = createBrokerConfig(brokerZk, portZk) val logDirZkPath = propsZk.getProperty("log.dir") logDirZk = new File(logDirZkPath) kafServer = createServer(new KafkaConfig(propsZk)) Thread.sleep(100) zkConsumer = new SimpleConsumer(ZKHOST, portZk, 1000000, 64 * 1024) } @AfterClass def after() { zkConsumer.close kafServer.shutdown Utils.rm(logDirZk) Thread.sleep(200) zkServer.shutdown Thread.sleep(200) } @Test def testZkConnectLog4jAppends() { setupLog4j for (i <- 1 to 5) info("test") Thread.sleep(500) var offset = 0L val messages = zkConsumer.fetch(new FetchRequest("test-topic", 0, offset, 1024 * 1024)) var count = 0 for (message <- messages) { count = count + 1 } assertEquals(5, count) } def setupLog4j { PropertyConfigurator.configure(getLog4jConfigWithZkConnect) } private def getLog4jConfigWithZkConnect: Properties = { var props = new Properties() props.put("log4j.debug", "true") props.put("log4j.appender.STDOUT", "org.apache.log4j.ConsoleAppender") props.put("log4j.appender.STDOUT.Target", "System.out") props.put("log4j.appender.STDOUT.layout", "org.apache.log4j.PatternLayout") props.put("log4j.appender.STDOUT.layout.ConversionPattern", "%-5p: %c - %m%n") // props.put("log4j.appender.KAFKA", "com.adobe.pass.commons.kafka.producer.DeferredActivationKafkaLog4jAppender") props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender") props.put("log4j.appender.KAFKA.Topic", "test-topic") props.put("log4j.appender.KAFKA.ZkConnect", zookeeperConnect) props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout") props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n") props.put("log4j.logger.org", "TRACE, KAFKA") props.put("log4j.logger.com", "TRACE, KAFKA") props.put("log4j.rootLogger", "INFO") props }
        Hide
        Neha Narkhede added a comment -

        Stefano,

        Committed this. Thanks for working on this patch ! If you are interested in contributing some more, please take a look at https://issues.apache.org/jira/secure/IssueNavigator.jspa?mode=hide&requestId=12318577. These are list of newbie jiras that are a good start to understanding various Kafka components.

        We will be glad to help out where required.

        Show
        Neha Narkhede added a comment - Stefano, Committed this. Thanks for working on this patch ! If you are interested in contributing some more, please take a look at https://issues.apache.org/jira/secure/IssueNavigator.jspa?mode=hide&requestId=12318577 . These are list of newbie jiras that are a good start to understanding various Kafka components. We will be glad to help out where required.
        Hide
        Stefano Santoro added a comment -

        Thanks for your time for taking a look at it, and modifying it. I enjoyed the experience. Let me know if I can help you during on any other tasks during my off time.

        Show
        Stefano Santoro added a comment - Thanks for your time for taking a look at it, and modifying it. I enjoyed the experience. Let me know if I can help you during on any other tasks during my off time.
        Hide
        Neha Narkhede added a comment -

        Thanks for uploading patch v2. I made one minor change to the patch -

        Removed the host and port parameters. They seem redundant given that we support the broker.list parameter.

        Show
        Neha Narkhede added a comment - Thanks for uploading patch v2. I made one minor change to the patch - Removed the host and port parameters. They seem redundant given that we support the broker.list parameter.
        Hide
        Stefano Santoro added a comment -

        Added unit test for the ZkConnect kafka log4j appender parameter. The appender now works with both connection scenarios: broker.list and zk.connect

        Show
        Stefano Santoro added a comment - Added unit test for the ZkConnect kafka log4j appender parameter. The appender now works with both connection scenarios: broker.list and zk.connect
        Hide
        Neha Narkhede added a comment -

        We had the host and port options before we ported all of that to the broker.list option. It seems like a good idea to keep configuration consistent across the code. Regarding the zk.connect option, I'm interested in seeing why it hangs too, though it sounded like you had a test that failed. I'll try writing one myself and see how that goes.

        Show
        Neha Narkhede added a comment - We had the host and port options before we ported all of that to the broker.list option. It seems like a good idea to keep configuration consistent across the code. Regarding the zk.connect option, I'm interested in seeing why it hangs too, though it sounded like you had a test that failed. I'll try writing one myself and see how that goes.
        Hide
        Stefano Santoro added a comment -

        I opted for configuration flexibility by assigning an option specification priority: zk.connecf, broker.list, and host/port pair (which is then translated into broker.list) If more then one is specified, then the one with the highest priority is picked. What I was hoping for though is that you wold help me understand why specifying broker.list works, but specifying zk.connect hangs.

        Show
        Stefano Santoro added a comment - I opted for configuration flexibility by assigning an option specification priority: zk.connecf, broker.list, and host/port pair (which is then translated into broker.list) If more then one is specified, then the one with the highest priority is picked. What I was hoping for though is that you wold help me understand why specifying broker.list works, but specifying zk.connect hangs.
        Hide
        Neha Narkhede added a comment -

        Thanks for uploading the patch. A couple of comments -

        1. As part of adding the broker.list and zk.connect options, it will be a good idea to get rid of host and port usage in the log4j appender
        2. If both zk.connect and broker.list options are specified, maybe throwing an InvalidConfigException would be good, specifying that only one option should be set.
        3. Also, it will be good to add tests to test the zk.connect option in the log4j appender.

        Show
        Neha Narkhede added a comment - Thanks for uploading the patch. A couple of comments - 1. As part of adding the broker.list and zk.connect options, it will be a good idea to get rid of host and port usage in the log4j appender 2. If both zk.connect and broker.list options are specified, maybe throwing an InvalidConfigException would be good, specifying that only one option should be set. 3. Also, it will be good to add tests to test the zk.connect option in the log4j appender.
        Hide
        Stefano Santoro added a comment -

        As you requested I have uploaded the patch as a file which I have generated as follows:

        ssantoro@stefanoubu:~/dev/kafka$ svn update .
        At revision 1230275.

        ssantoro@stefanoubu:~/dev/kafka$ svn diff . > Log4jAppender.patch
        ssantoro@stefanoubu:~/dev/kafka$

        Show
        Stefano Santoro added a comment - As you requested I have uploaded the patch as a file which I have generated as follows: ssantoro@stefanoubu:~/dev/kafka$ svn update . At revision 1230275. ssantoro@stefanoubu:~/dev/kafka$ svn diff . > Log4jAppender.patch ssantoro@stefanoubu:~/dev/kafka$
        Hide
        Neha Narkhede added a comment -

        Thanks for getting started on this. I have a couple of requests -

        1. Could you please upload a patch to this JIRA and grant it to Apache ?
        2. This patch doesn't apply cleanly to trunk, would you mind updating a patch that would ?

        It will make it much easier for us to review it.

        Also, when you said the zk connect option hangs, did you get a chance to take a look at the debug/trace logs to see where it hangs ? Do you mind uploading those log files here ?

        Show
        Neha Narkhede added a comment - Thanks for getting started on this. I have a couple of requests - 1. Could you please upload a patch to this JIRA and grant it to Apache ? 2. This patch doesn't apply cleanly to trunk, would you mind updating a patch that would ? It will make it much easier for us to review it. Also, when you said the zk connect option hangs, did you get a chance to take a look at the debug/trace logs to see where it hangs ? Do you mind uploading those log files here ?
        Hide
        Stefano Santoro added a comment -

        with this patch the kafka lof4j appender supports formatters. I tested it well against logger properties specifying broker.list, but it hangs when I specify zk.connect instead. The new log4j properties are serializerClass (instead of encoder for serilizer.class), zkConnect (for zk.connect), and brokerList (for broker.list)

        Show
        Stefano Santoro added a comment - with this patch the kafka lof4j appender supports formatters. I tested it well against logger properties specifying broker.list, but it hangs when I specify zk.connect instead. The new log4j properties are serializerClass (instead of encoder for serilizer.class), zkConnect (for zk.connect), and brokerList (for broker.list)
        Hide
        Stefano Santoro added a comment - - edited

        see attached patch file

        Show
        Stefano Santoro added a comment - - edited see attached patch file

          People

          • Assignee:
            Stefano Santoro
            Reporter:
            Stefano Santoro
          • Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development