Uploaded image for project: 'Flume'
  1. Flume
  2. FLUME-3012

Sending a term signal can not shutdown Flume agent when KafkaChannel starting has exceptions

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 1.6.0
    • Fix Version/s: None
    • Component/s: Kafka Channel
    • Labels:
    • Environment:

      Flume 1.6.0+Kafka 0.9

      Description

      Use Kafka Channel in the agent configuration:
      tier1.sources = source1
      tier1.channels = channel1
      tier1.sinks = sink1
      tier1.sources.source1.type = exec
      tier1.sources.source1.command = /usr/bin/vmstat 1
      tier1.sources.source1.channels = channel1
      tier1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
      tier1.channels.channel1.kafka.bootstrap.servers = a.b.c.d:6667
      tier1.sinks.sink1.type = hdfs
      tier1.sinks.sink1.hdfs.path = /tmp/kafka/channel
      tier1.sinks.sink1.hdfs.rollInterval = 5
      tier1.sinks.sink1.hdfs.rollSize = 0
      tier1.sinks.sink1.hdfs.rollCount = 0
      tier1.sinks.sink1.hdfs.fileType = DataStream
      tier1.sinks.sink1.channel = channel1

      Accidentally kaka.bootstrap.servers is not correct, errors will be thrown out:
      ......
      )] Waiting for channel: channel1 to start. Sleeping for 500 ms
      2016-10-21 01:15:50,739 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:161)] Waiting for channel: channel1 to start. Sleeping for 500 ms
      2016-10-21 01:15:51,240 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:161)] Waiting for channel: channel1 to start. Sleeping for 500 ms
      2016-10-21 01:15:51,735 (lifecycleSupervisor-1-6) [INFO - org.apache.flume.channel.kafka.KafkaChannel.start(KafkaChannel.java:115)] Starting Kafka Channel: channel1
      2016-10-21 01:15:51,737 (lifecycleSupervisor-1-6) [INFO - org.apache.kafka.common.config.AbstractConfig.logAll(AbstractConfig.java:165)] ProducerConfig values:
      compression.type = none
      metric.reporters = []
      metadata.max.age.ms = 300000
      metadata.fetch.timeout.ms = 60000
      reconnect.backoff.ms = 50
      sasl.kerberos.ticket.renew.window.factor = 0.8
      bootstrap.servers = [a.b.c.d:6667]
      retry.backoff.ms = 100
      sasl.kerberos.kinit.cmd = /usr/bin/kinit
      buffer.memory = 33554432
      timeout.ms = 30000
      key.serializer = class org.apache.kafka.common.serialization.StringSerializer
      sasl.kerberos.service.name = null
      sasl.kerberos.ticket.renew.jitter = 0.05
      ssl.keystore.type = JKS
      ssl.trustmanager.algorithm = PKIX
      block.on.buffer.full = false
      ssl.key.password = null
      max.block.ms = 60000
      sasl.kerberos.min.time.before.relogin = 60000
      connections.max.idle.ms = 540000
      ssl.truststore.password = null
      max.in.flight.requests.per.connection = 5
      metrics.num.samples = 2
      client.id =
      ssl.endpoint.identification.algorithm = null
      ssl.protocol = TLS
      request.timeout.ms = 30000
      ssl.provider = null
      ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
      acks = all
      batch.size = 16384
      ssl.keystore.location = null
      receive.buffer.bytes = 32768
      ssl.cipher.suites = null
      ssl.truststore.type = JKS
      security.protocol = PLAINTEXT
      retries = 0
      max.request.size = 1048576
      value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
      ssl.truststore.location = null
      ssl.keystore.password = null
      ssl.keymanager.algorithm = SunX509
      metrics.sample.window.ms = 30000
      partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
      send.buffer.bytes = 131072
      linger.ms = 0

      2016-10-21 01:15:51,742 (lifecycleSupervisor-1-6) [DEBUG - org.apache.kafka.common.metrics.Metrics.sensor(Metrics.java:201)] Added sensor with name bufferpool-wait-time
      2016-10-21 01:15:51,743 (lifecycleSupervisor-1-6) [DEBUG - org.apache.kafka.common.metrics.Metrics.sensor(Metrics.java:201)] Added sensor with name buffer-exhausted-records
      2016-10-21 01:15:51,744 (lifecycleSupervisor-1-6) [INFO - org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:613)] Closing the Kafka producer with timeoutMillis = 0 ms.
      2016-10-21 01:15:51,744 (lifecycleSupervisor-1-6) [DEBUG - org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:654)] The Kafka producer has closed.
      2016-10-21 01:15:51,744 (lifecycleSupervisor-1-6) [ERROR - org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:253)] Unable to start org.apache.flume.channel.kafka.KafkaChannel

      {name: channel1}

      - Exception follows.
      org.apache.kafka.common.KafkaException: Failed to construct kafka producer
      at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:321)
      at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:181)
      at org.apache.flume.channel.kafka.KafkaChannel.start(KafkaChannel.java:116)
      at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      at java.lang.Thread.run(Thread.java:745)
      Caused by: org.apache.kafka.common.config.ConfigException: DNS resolution failed for url in bootstrap.servers: a.b.c.d:6667
      at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:48)
      at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:269)
      ... 10 more
      2016-10-21 01:15:51,745 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:161)] Waiting for channel: channel1 to start. Sleeping for 500 ms
      ##################INTO-ShutdownHook-NOW########################
      2016-10-21 01:15:52,246 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:161)] Waiting for channel: channel1 to start. Sleeping for 500 ms
      2016-10-21 01:15:52,748 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:161)] Waiting for channel: channel1 to start. Sleeping for 500 ms
      2016-10-21 01:15:53,248 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:161)] Waiting for channel: channel1 to start. Sleeping for 500 ms
      2016-10-21 01:15:53,749 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:161)] Waiting for channel: channel1 to start. Sleeping for 500 ms
      2016-10-21 01:15:54,250 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:161)] Waiting for channel: channel1 to start. Sleeping for 500 ms
      2016-10-21 01:15:54,745 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.channel.kafka.KafkaChannel.start(KafkaChannel.java:115)] Starting Kafka Channel: channel1
      2016-10-21 01:15:54,747 (lifecycleSupervisor-1-3) [INFO - org.apache.kafka.common.config.AbstractConfig.logAll(AbstractConfig.java:165)] ProducerConfig values:
      compression.type = none
      metric.reporters = []
      metadata.max.age.ms = 300000
      metadata.fetch.timeout.ms = 60000
      reconnect.backoff.ms = 50
      sasl.kerberos.ticket.renew.window.factor = 0.8
      bootstrap.servers = [a.b.c.d:6667]
      retry.backoff.ms = 100
      sasl.kerberos.kinit.cmd = /usr/bin/kinit
      buffer.memory = 33554432
      timeout.ms = 30000
      key.serializer = class org.apache.kafka.common.serialization.StringSerializer
      sasl.kerberos.service.name = null
      sasl.kerberos.ticket.renew.jitter = 0.05
      ssl.keystore.type = JKS
      ssl.trustmanager.algorithm = PKIX
      block.on.buffer.full = false
      ssl.key.password = null
      max.block.ms = 60000
      sasl.kerberos.min.time.before.relogin = 60000
      connections.max.idle.ms = 540000
      ssl.truststore.password = null
      max.in.flight.requests.per.connection = 5
      metrics.num.samples = 2
      client.id =
      ssl.endpoint.identification.algorithm = null
      ssl.protocol = TLS
      request.timeout.ms = 30000
      ssl.provider = null
      ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
      acks = all
      batch.size = 16384
      ssl.keystore.location = null
      receive.buffer.bytes = 32768
      ssl.cipher.suites = null
      ssl.truststore.type = JKS
      security.protocol = PLAINTEXT
      retries = 0
      max.request.size = 1048576
      value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
      ssl.truststore.location = null
      ssl.keystore.password = null
      ssl.keymanager.algorithm = SunX509
      metrics.sample.window.ms = 30000
      partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
      send.buffer.bytes = 131072
      linger.ms = 0

      2016-10-21 01:15:54,747 (lifecycleSupervisor-1-3) [DEBUG - org.apache.kafka.common.metrics.Metrics.sensor(Metrics.java:201)] Added sensor with name bufferpool-wait-time
      2016-10-21 01:15:54,748 (lifecycleSupervisor-1-3) [DEBUG - org.apache.kafka.common.metrics.Metrics.sensor(Metrics.java:201)] Added sensor with name buffer-exhausted-records
      2016-10-21 01:15:54,787 (lifecycleSupervisor-1-3) [INFO - org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:613)] Closing the Kafka producer with timeoutMillis = 0 ms.
      2016-10-21 01:15:54,788 (lifecycleSupervisor-1-3) [DEBUG - org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:654)] The Kafka producer has closed.
      2016-10-21 01:15:54,788 (lifecycleSupervisor-1-3) [ERROR - org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:253)] Unable to start org.apache.flume.channel.kafka.KafkaChannel

      {name: channel1}

      - Exception follows.
      org.apache.kafka.common.KafkaException: Failed to construct kafka producer
      at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:321)
      at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:181)
      at org.apache.flume.channel.kafka.KafkaChannel.start(KafkaChannel.java:116)
      at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      at java.lang.Thread.run(Thread.java:745)
      Caused by: org.apache.kafka.common.config.ConfigException: DNS resolution failed for url in bootstrap.servers: a.b.c.d:6667
      at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:48)
      at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:269)
      ... 10 more
      2016-10-21 01:15:54,789 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:161)] Waiting for channel: channel1 to start. Sleeping for 500 ms
      ......

      Issue "kill -15 <flume-pid>" to shutdown the process, it does not work. From the log, the ShutdownHook was invoked but it does not do the cleanup.

        Attachments

        1. threaddumps.log
          17 kB
          Ping Wang

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              wpwang Ping Wang
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated: