Uploaded image for project: 'Flume'
  1. Flume
  2. FLUME-3340

Error with HDFS SINK kerberized

    XMLWordPrintableJSON

    Details

    • Type: Choose from below ...
    • Status: Open
    • Priority: Blocker
    • Resolution: Unresolved
    • Affects Version/s: 1.6.0
    • Fix Version/s: None
    • Component/s: Configuration
    • Labels:
      None
    • Environment:

      Flume 1.6 (embedded release in CDH 15.16.1)

      kafka 2.11-2.1.1cp1-1 (conluent community)
      3 securized brokers witch SASL_PLAINTEXT (mechanism PLAIN)

      Hadoop 2.6.0 (CDH 15.6.1) kerberized
      RHEL 6.9

      Description

      Hello,

      I'm a big data administrator with cloudera distribution.

      We use Flume to collect external data and we push it in hdfs (hadoop).

      We have a issue with hdfs sink with theses messages :

       

      2019-07-11 09:22:12,147 INFO org.apache.flume.sink.hdfs.BucketWriter: Creating hdfs:/app/2019-07-11/apps_flume01_2019-07-11.1562829732122.json.tmp

      2019-07-11 09:22:12,441 ERROR org.apache.flume.sink.hdfs.HDFSEventSink: process failed
      java.lang.NullPointerException
      at org.apache.kafka.common.security.plain.PlainSaslServer$PlainSaslServerFactory.getMechanismNames(PlainSaslServer.java:165)
      at org.apache.hadoop.security.SaslRpcServer$FastSaslServerFactory.<init>(SaslRpcServer.java:381)
      at org.apache.hadoop.security.SaslRpcServer.init(SaslRpcServer.java:186)
      at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:570)
      at org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:418)
      at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:314)
      at org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider$DefaultProxyFactory.createProxy(ConfiguredFailoverProxyProvider.java:68)
      at org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider.getProxy(ConfiguredFailoverProxyProvider.java:152)
      at org.apache.hadoop.io.retry.RetryInvocationHandler.<init>(RetryInvocationHandler.java:75)
      at org.apache.hadoop.io.retry.RetryInvocationHandler.<init>(RetryInvocationHandler.java:66)
      at org.apache.hadoop.io.retry.RetryProxy.create(RetryProxy.java:58)
      at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:181)
      at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:763)
      at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:694)
      at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:159)
      at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2816)
      at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:98)
      at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2853)
      at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2835)
      at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:387)
      at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:378)
      at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
      at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:260)
      at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:252)
      at org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:701)
      at java.security.AccessController.doPrivileged(Native Method)
      at javax.security.auth.Subject.doAs(Subject.java:422)
      at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1924)
      at org.apache.flume.auth.UGIExecutor.execute(UGIExecutor.java:46)
      at org.apache.flume.auth.KerberosAuthenticator.execute(KerberosAuthenticator.java:64)
      at org.apache.flume.sink.hdfs.BucketWriter$9.call(BucketWriter.java:698)
      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      at java.lang.Thread.run(Thread.java:748)
      2019-07-11 09:22:12,443 ERROR org.apache.flume.SinkRunner: Unable to deliver event. Exception follows.
      org.apache.flume.EventDeliveryException: java.lang.NullPointerException
      at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:451)
      at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
      at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
      at java.lang.Thread.run(Thread.java:748)
      Caused by: java.lang.NullPointerException
      at org.apache.kafka.common.security.plain.PlainSaslServer$PlainSaslServerFactory.getMechanismNames(PlainSaslServer.java:165)
      at org.apache.hadoop.security.SaslRpcServer$FastSaslServerFactory.<init>(SaslRpcServer.java:381)
      at org.apache.hadoop.security.SaslRpcServer.init(SaslRpcServer.java:186)
      at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:570)
      at org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:418)
      at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:314)
      at org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider$DefaultProxyFactory.createProxy(ConfiguredFailoverProxyProvider.java:68)
      at org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider.getProxy(ConfiguredFailoverProxyProvider.java:152)
      at org.apache.hadoop.io.retry.RetryInvocationHandler.<init>(RetryInvocationHandler.java:75)
      at org.apache.hadoop.io.retry.RetryInvocationHandler.<init>(RetryInvocationHandler.java:66)
      at org.apache.hadoop.io.retry.RetryProxy.create(RetryProxy.java:58)
      at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:181)
      at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:763)
      at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:694)
      at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:159)
      at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2816)
      at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:98)
      at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2853)
      at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2835)
      at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:387)
      at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:378)
      at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
      at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:260)
      at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:252)
      at org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:701)
      at java.security.AccessController.doPrivileged(Native Method)
      at javax.security.auth.Subject.doAs(Subject.java:422)
      at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1924)
      at org.apache.flume.auth.UGIExecutor.execute(UGIExecutor.java:46)
      at org.apache.flume.auth.KerberosAuthenticator.execute(KerberosAuthenticator.java:64)
      at org.apache.flume.sink.hdfs.BucketWriter$9.call(BucketWriter.java:698)
      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      ... 1 more

       

       

      The source kafka (confluent community) is securiezed with SASL_PLAINTEXT

       

      The flume configuration is

       

      agent.sources = apps
      agent.channels = appsChannel
      agent.sinks = appsSink

      ###

          1. Source definition
            ###
            agent.sources.apps.type = org.apache.flume.source.kafka.KafkaSource
            agent.sources.apps.kafka.bootstrap.servers =kafka1:9092,kafka2:9092,kafka3:9092

      agent.sources.apps.kafka.topics = mytopic01

      agent.sources.apps.kafka.consumer.client.id=ClientIDapps
      agent.sources.apps.kafka.consumer.group.id=GroupIDapps

      agent.sources.apps.channels = appsChannel
      agent.sources.apps.batchSize=500

      agent.sources.apps.interceptors = i1 hostint
      agent.sources.apps.interceptors.i1.type = timestamp
      agent.sources.apps.interceptors.hostint.type = org.apache.flume.interceptor.HostInterceptor$Builder
      agent.sources.apps.interceptors.hostint.preserveExisting = true
      agent.sources.apps.interceptors.hostint.useIP = false

      agent.sources.apps.kafka.consumer.security.protocol=SASL_PLAINTEXT
      agent.sources.apps.kafka.consumer.sasl.mechanism=PLAIN
      agent.sources.apps.kafka.consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=username password=password ;
      ###

          1. Channel definition
            ###

      agent.channels.appsChannel.type = memory
      agent.channels.appsChannel.capacity = 500000
      agent.channels.appsChannel.transactionCapacity = 1000

       

      ###

          1. Sink definition

      agent.sinks.appsSink.type = hdfs
      agent.sinks.appsSink.hdfs.kerberosPrincipal = $KERBEROS_PRINCIPAL
      agent.sinks.appsSink.hdfs.kerberosKeytab = $KERBEROS_KEYTAB
      agent.sinks.appsSink.maxOpenFiles = 100
      agent.sinks.appsSink.hdfs.path = hdfs:/apps/%Y-%m-%d
      agent.sinks.appsSink.hdfs.filePrefix=apps_%{host}_%Y-%m-%d
      agent.sinks.appsSink.hdfs.fileSuffix=.json
      agent.sinks.appsSink.hdfs.rollInterval=60
      agent.sinks.appsSink.hdfs.rollSize=0
      agent.sinks.appsSink.hdfs.rollCount=100000
      agent.sinks.appsSink.hdfs.idleTimeout=60
      agent.sinks.appsSink.hdfs.callTimeout=60000
      agent.sinks.appsSink.hdfs.batchSize=1000
      agent.sinks.appsSink.hdfs.fileType=DataStream
      agent.sinks.appsSink.hdfs.writeFormat=Writable
      agent.sinks.appsSink.hdfs.useLocalTimeStamp=false
      agent.sinks.appsSink.hdfs.serializer=TEXT
      agent.sinks.appsSink.channel = appsChannel

      Best Regards,
      ODa

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              ODa ODa
            • Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

              • Created:
                Updated: