Uploaded image for project: 'Log4j 2'
  1. Log4j 2
  2. LOG4J2-278

Embedded Flume agent fails to rollback

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.0-beta7
    • 2.0-beta8
    • Flume Appender
    • None

    Description

      If the embedded Flume agent decides to rollback (for example, because the succeeding node is full) then that rollback fails. It appears that this is because of Flume code attempting to recursively send log events. i.e. where other Log4j2 appenders use StatusLogger the embedded Flume agent does not.

      Steps:
      1. Setup a Flume agent as the succeeding agent and convince it to fill its queue. (In my case, I can make it run out of memory then the queue fills which makes it refuse to accept more messages.)
      2. Extract the attached code (a very simple Jetty server) and modify the log4j2.xml to have embedded=true for the FlumeAppender
      3. Run the attached code

      Expected result:
      The embedded agent should queue messages.

      Actual result:
      2013-06-10 14:39:11,173 ERROR An exception occurred processing Appender FlumeAppender java.lang.IllegalStateException: rollback() called when transaction is COMPLETED! identityHashCode=1164913745
      at com.google.common.base.Preconditions.checkState(Preconditions.java:172)
      at org.apache.flume.channel.BasicTransactionSemantics.rollback(BasicTransactionSemantics.java:171)
      at org.apache.flume.channel.ChannelProcessor.processEvent(ChannelProcessor.java:269)
      at org.apache.logging.log4j.flume.appender.Log4jEventSource.send(Log4jEventSource.java:59)
      at org.apache.logging.log4j.flume.appender.FlumeEmbeddedManager.send(FlumeEmbeddedManager.java:123)
      at org.apache.logging.log4j.flume.appender.FlumeAppender.append(FlumeAppender.java:86)
      at org.apache.logging.log4j.core.config.AppenderControl.callAppender(AppenderControl.java:102)
      at org.apache.logging.log4j.core.config.LoggerConfig.callAppenders(LoggerConfig.java:424)
      at org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:405)
      at org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:366)
      at org.apache.logging.log4j.core.Logger.log(Logger.java:110)
      at org.apache.logging.log4j.spi.AbstractLoggerWrapper.log(AbstractLoggerWrapper.java:55)
      at org.slf4j.impl.SLF4JLogger.debug(SLF4JLogger.java:139)
      at org.apache.flume.channel.file.Log.rollback(Log.java:566)
      at org.apache.flume.channel.file.FileChannel$FileBackedTransaction.doRollback(FileChannel.java:590)
      at org.apache.flume.channel.BasicTransactionSemantics.rollback(BasicTransactionSemantics.java:179)
      at org.apache.flume.sink.AvroSink.process(AvroSink.java:316)
      at org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:182)
      at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
      at java.lang.Thread.run(Thread.java:722)

      (note the identityHashCode is from my own modifications for debugging but does not affect this repro).

      The initial problem is in org.apache.flume.channel.file.Log line 566 where, during a rollback, it logs that it's rolling back. Flume transactions are held in a thread local. That transaction is open, then the rollback is attempted, which hits Log line 566, so a log write is attempted, which gets the same transaction, which is already open, which causes the exception.

      If I explicitly set the logging on org.apache.flume.channel.file.Log to warn then I get:
      013-06-10 15:25:25,673 WARN [SinkRunner-PollingRunner-FailoverSinkProcessor-1241018503] appender.Log4jEventSource (Log4jEventSource.java:61) - Unabled to process event {}[Event headers =

      {timeStamp=1370903125671, guId=a5c6c2bf-d21c-11e2-a12c-24be05270b5c}

      , body.length = 216 ]
      org.apache.flume.ChannelException: Unable to put event on required channel: FileChannel primary

      { dataDirs: [/var/local/flume/castellan-reader-berkeley-db/data] }

      txn: 1638840125
      at org.apache.flume.channel.ChannelProcessor.processEvent(ChannelProcessor.java:275) ~[flume-ng-core-1.3.1-SIN124-SNAPSHOT.jar:1.3.1-SIN124-SNAPSHOT]
      at org.apache.logging.log4j.flume.appender.Log4jEventSource.send(Log4jEventSource.java:59) [log4j-flume-ng-2.0-beta7.jar:2.0-beta7]
      at org.apache.logging.log4j.flume.appender.FlumeEmbeddedManager.send(FlumeEmbeddedManager.java:123) [log4j-flume-ng-2.0-beta7.jar:2.0-beta7]
      at org.apache.logging.log4j.flume.appender.FlumeAppender.append(FlumeAppender.java:86) [log4j-flume-ng-2.0-beta7.jar:2.0-beta7]
      at org.apache.logging.log4j.core.config.AppenderControl.callAppender(AppenderControl.java:102) [log4j-core-2.0-beta7.jar:2.0-beta7]
      at org.apache.logging.log4j.core.config.LoggerConfig.callAppenders(LoggerConfig.java:424) [log4j-core-2.0-beta7.jar:2.0-beta7]
      at org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:405) [log4j-core-2.0-beta7.jar:2.0-beta7]
      at org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:366) [log4j-core-2.0-beta7.jar:2.0-beta7]
      at org.apache.logging.log4j.core.Logger.log(Logger.java:110) [log4j-core-2.0-beta7.jar:2.0-beta7]
      at org.apache.logging.log4j.spi.AbstractLoggerWrapper.log(AbstractLoggerWrapper.java:55) [log4j-api-2.0-beta7.jar:2.0-beta7]
      at org.slf4j.impl.SLF4JLogger.debug(SLF4JLogger.java:154) [log4j-slf4j-impl-2.0-beta7.jar:2.0-beta7]
      at org.apache.flume.sink.AvroSink.destroyConnection(AvroSink.java:199) [flume-ng-core-1.3.1-SIN124-SNAPSHOT.jar:1.3.1-SIN124-SNAPSHOT]
      at org.apache.flume.sink.AvroSink.verifyConnection(AvroSink.java:224) [flume-ng-core-1.3.1-SIN124-SNAPSHOT.jar:1.3.1-SIN124-SNAPSHOT]
      at org.apache.flume.sink.AvroSink.process(AvroSink.java:282) [flume-ng-core-1.3.1-SIN124-SNAPSHOT.jar:1.3.1-SIN124-SNAPSHOT]
      at org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:162) [flume-ng-core-1.3.1-SIN124-SNAPSHOT.jar:1.3.1-SIN124-SNAPSHOT]
      at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) [flume-ng-core-1.3.1-SIN124-SNAPSHOT.jar:1.3.1-SIN124-SNAPSHOT]
      at java.lang.Thread.run(Thread.java:722) [?:1.7.0_21]
      Caused by: java.lang.IllegalStateException: begin() called when transaction is OPEN!
      at com.google.common.base.Preconditions.checkState(Preconditions.java:145) ~[guava-10.0.1.jar:?]
      at org.apache.flume.channel.BasicTransactionSemantics.begin(BasicTransactionSemantics.java:133) ~[flume-ng-core-1.3.1-SIN124-SNAPSHOT.jar:1.3.1-SIN124-SNAPSHOT]
      at org.apache.flume.channel.ChannelProcessor.processEvent(ChannelProcessor.java:263) ~[flume-ng-core-1.3.1-SIN124-SNAPSHOT.jar:1.3.1-SIN124-SNAPSHOT]
      ... 16 more
      2013-06-10 15:25:25,675 ERROR An exception occurred processing Appender FlumeAppender org.apache.flume.ChannelException: Unable to put event on required channel: FileChannel primary

      { dataDirs: [/var/local/flume/castellan-reader-berkeley-db/data] }

      txn: 1638840125
      at org.apache.flume.channel.ChannelProcessor.processEvent(ChannelProcessor.java:275)
      at org.apache.logging.log4j.flume.appender.Log4jEventSource.send(Log4jEventSource.java:59)
      at org.apache.logging.log4j.flume.appender.FlumeEmbeddedManager.send(FlumeEmbeddedManager.java:123)
      at org.apache.logging.log4j.flume.appender.FlumeAppender.append(FlumeAppender.java:86)
      at org.apache.logging.log4j.core.config.AppenderControl.callAppender(AppenderControl.java:102)
      at org.apache.logging.log4j.core.config.LoggerConfig.callAppenders(LoggerConfig.java:424)
      at org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:405)
      at org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:366)
      at org.apache.logging.log4j.core.Logger.log(Logger.java:110)
      at org.apache.logging.log4j.spi.AbstractLoggerWrapper.log(AbstractLoggerWrapper.java:55)
      at org.slf4j.impl.SLF4JLogger.debug(SLF4JLogger.java:154)
      at org.apache.flume.sink.AvroSink.destroyConnection(AvroSink.java:199)
      at org.apache.flume.sink.AvroSink.verifyConnection(AvroSink.java:224)
      at org.apache.flume.sink.AvroSink.process(AvroSink.java:282)
      at org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:162)
      at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
      at java.lang.Thread.run(Thread.java:722)
      Caused by: java.lang.IllegalStateException: begin() called when transaction is OPEN!
      at com.google.common.base.Preconditions.checkState(Preconditions.java:145)
      at org.apache.flume.channel.BasicTransactionSemantics.begin(BasicTransactionSemantics.java:133)
      at org.apache.flume.channel.ChannelProcessor.processEvent(ChannelProcessor.java:263)
      ... 16 more

      That failure appears to be AvroSink line 199.

      Attachments

        1. flume-embedded-web-flume-2067.tar.gz
          4 kB
          Edward Sargisson

        Activity

          People

            Unassigned Unassigned
            ejsarge Edward Sargisson
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: