Uploaded image for project: 'Flume'
  1. Flume
  2. FLUME-3337

ChannelProcessor#processEventBatch() method catch exception code block may override actual exception

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Critical
    • Resolution: Unresolved
    • 1.9.0
    • None
    • Channel
    • None

    Description

      I encountered an exception, as following:

      KafkaSource EXCEPTION, {} java.lang.NullPointerException: null
      at org.apache.flume.channel.SpillableMemoryChannel$SpillableMemoryTransaction.doRollback(SpillableMemoryChannel.java:587)
      at org.apache.flume.channel.BasicTransactionSemantics.rollback(BasicTransactionSemantics.java:168)
      at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:196)
      at org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:311)
      at org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:60)
      at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133)
      at java.lang.Thread.run(Thread.java:748)

       ChannelProcessor#processEventBatch() method catch exception if put evet to channel error, but the catch code block first call tx.rollback(), however tx.rollback() method may throw new exception, this moment, the actual exception is overrided, we did't know the actual exception.We should first log the excpetion, then call tx.rollback().

      public void processEventBatch(List<Event> events) {
      for (Channel reqChannel : reqChannelQueue.keySet()) {
      Transaction tx = reqChannel.getTransaction();
      Preconditions.checkNotNull(tx, "Transaction object must not be null");
      try {
      tx.begin();
      
      List<Event> batch = reqChannelQueue.get(reqChannel);
      
      for (Event event : batch) {
      reqChannel.put(event);
      }
      
      tx.commit();
      } catch (Throwable t) {
      // this line may throw new exception, the actual exception is overrided, we did't know the actual exception
      tx.rollback();
      if (t instanceof Error) {
      LOG.error("Error while writing to required channel: " + reqChannel, t);
      throw (Error) t;
      } else if (t instanceof ChannelException) {
      throw (ChannelException) t;
      } else {
      throw new ChannelException("Unable to put batch on required " +
      "channel: " + reqChannel, t);
      }
      } finally {
      
      }
      }
      }
      

       

       

       

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              ykgarfield yangkun
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 10m
                  10m