Uploaded image for project: 'Flume'
  1. Flume
  2. FLUME-3043

KafkaSink SinkCallback throws NullPointerException when Log4J level is debug

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.7.0
    • Fix Version/s: 1.8.0
    • Component/s: Sinks+Sources
    • Labels:
      None

      Description

      When we send a event with SinkCallback to kafka under DEBUG level of log4j, if kafka reponses a result with exception but not a RecordMetadata. Kafka Sink will throw NPE.

      code in SinkCallback:

      if (logger.isDebugEnabled()) {
        long eventElapsedTime = System.currentTimeMillis() - startTime;
        logger.debug("Acked message partition:{} ofset:{}",  metadata.partition(), metadata.offset());
        logger.debug("Elapsed time for send: {}", eventElapsedTime);
      }
      

      code in Kafka Producer:

      if (exception == null) { 
        RecordMetadata metadata = new RecordMetadata(...);
        thunk.callback.onCompletion(metadata, null);
      } else {
        thunk.callback.onCompletion(null, exception);
      }
      

        Issue Links

          Activity

          Hide
          hudson Hudson added a comment -

          SUCCESS: Integrated in Jenkins build Flume-trunk-hbase-1 #245 (See https://builds.apache.org/job/Flume-trunk-hbase-1/245/)
          FLUME-3043. Fix NPE in Kafka Sink and Channel (bessbd: http://git-wip-us.apache.org/repos/asf/flume/repo?p=flume.git&a=commit&h=c718dae09d10db640cb9eb59f8abb11bd385a799)

          • (edit) flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
          • (edit) flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
          Show
          hudson Hudson added a comment - SUCCESS: Integrated in Jenkins build Flume-trunk-hbase-1 #245 (See https://builds.apache.org/job/Flume-trunk-hbase-1/245/ ) FLUME-3043 . Fix NPE in Kafka Sink and Channel (bessbd: http://git-wip-us.apache.org/repos/asf/flume/repo?p=flume.git&a=commit&h=c718dae09d10db640cb9eb59f8abb11bd385a799 ) (edit) flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java (edit) flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flume/pull/125

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flume/pull/125
          Hide
          jira-bot ASF subversion and git services added a comment -

          Commit c718dae09d10db640cb9eb59f8abb11bd385a799 in flume's branch refs/heads/trunk from dengkai
          [ https://git-wip-us.apache.org/repos/asf?p=flume.git;h=c718dae ]

          FLUME-3043. Fix NPE in Kafka Sink and Channel

          When logging level is set to DEBUG, Kafka Sink and Kafka Channel may throw a NullPointerException.

          This patch ensures that `metadata` is not null to avoid the exception.

          This closes #125

          Reviewers: Denes Arvay, Bessenyei Balázs Donát

          (loleek via Bessenyei Balázs Donát)

          Show
          jira-bot ASF subversion and git services added a comment - Commit c718dae09d10db640cb9eb59f8abb11bd385a799 in flume's branch refs/heads/trunk from dengkai [ https://git-wip-us.apache.org/repos/asf?p=flume.git;h=c718dae ] FLUME-3043 . Fix NPE in Kafka Sink and Channel When logging level is set to DEBUG, Kafka Sink and Kafka Channel may throw a NullPointerException. This patch ensures that `metadata` is not null to avoid the exception. This closes #125 Reviewers: Denes Arvay, Bessenyei Balázs Donát (loleek via Bessenyei Balázs Donát)
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user loleek opened a pull request:

          https://github.com/apache/flume/pull/125

          bugfix: when loglevel is debug, Kafka Sink will throw NPE because of …

          When we send a event with SinkCallback to kafka under DEBUG level of log4j, if kafka reponses a result with exception but not a RecordMetadata. Kafka Sink will throw NPE.
          code in SinkCallback:
          if (logger.isDebugEnabled()) {
          long eventElapsedTime = System.currentTimeMillis() - startTime;
          logger.debug("Acked message partition:{} ofset:{}", metadata.partition(), metadata.offset());
          logger.debug("Elapsed time for send: {}", eventElapsedTime);
          }
          code in Kafka Producer:
          if (exception == null)

          { RecordMetadata metadata = new RecordMetadata(...); thunk.callback.onCompletion(metadata, null); }

          else

          { thunk.callback.onCompletion(null, exception); }

          I've already created this issue on jira.
          https://issues.apache.org/jira/browse/FLUME-3043?jql=project%20%3D%20FLUME

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/loleek/flume npe-kafka-sink

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flume/pull/125.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #125


          commit dcad5ff6651c4c1714ebb7c9fae29d377096f48e
          Author: dengkai02 <dengkai02@baidu.com>
          Date: 2017-03-29T11:27:29Z

          bugfix: when loglevel is debug, Kafka Sink will throw NPE because of metadata can be null


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user loleek opened a pull request: https://github.com/apache/flume/pull/125 bugfix: when loglevel is debug, Kafka Sink will throw NPE because of … When we send a event with SinkCallback to kafka under DEBUG level of log4j, if kafka reponses a result with exception but not a RecordMetadata. Kafka Sink will throw NPE. code in SinkCallback: if (logger.isDebugEnabled()) { long eventElapsedTime = System.currentTimeMillis() - startTime; logger.debug("Acked message partition:{} ofset:{}", metadata.partition(), metadata.offset()); logger.debug("Elapsed time for send: {}", eventElapsedTime); } code in Kafka Producer: if (exception == null) { RecordMetadata metadata = new RecordMetadata(...); thunk.callback.onCompletion(metadata, null); } else { thunk.callback.onCompletion(null, exception); } I've already created this issue on jira. https://issues.apache.org/jira/browse/FLUME-3043?jql=project%20%3D%20FLUME You can merge this pull request into a Git repository by running: $ git pull https://github.com/loleek/flume npe-kafka-sink Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flume/pull/125.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #125 commit dcad5ff6651c4c1714ebb7c9fae29d377096f48e Author: dengkai02 <dengkai02@baidu.com> Date: 2017-03-29T11:27:29Z bugfix: when loglevel is debug, Kafka Sink will throw NPE because of metadata can be null

            People

            • Assignee:
              lolee_k dengkai
              Reporter:
              lolee_k dengkai
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development