Uploaded image for project: 'Flume'
  1. Flume
  2. FLUME-1079

Flume agent reconfiguration enters permanent bad state

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • 1.2.0
    • 1.2.0
    • Node
    • None
    • CentOS 6.2 64-bit
      JDK 1.6.0_26 64-bit

    Description

      Using flume trunk, commit ad24cb31bb1b5a0d1ee4b0ec18572a223ed9d397

      Steps:
      1) Start with this config in a1.properties:

      1. a = agent
      2. r = source
      3. c = channel
      4. k = sink
        a1.sources = r1
        a1.channels = c1
        a1.sinks = k1
      5. ===SOURCES===
        a1.sources.r1.type = NETCAT
        a1.sources.r1.channels = c1
        a1.sources.r1.bind = localhost
        a1.sources.r1.port = 1473
      6. ===CHANNELS===
        a1.channels.c1.type = MEMORY
      7. ===SINKS===
        a1.sinks.k1.type = NULL
        a1.sinks.k1.channel = c1

      2) Run the flume node:
      bin/flume-ng node --conf conf --conf-file conf/a1.properties --name a1

      3) Update the a1.properties file to add a new source a the same port, which would cause a port bind exception on r2 due to r1 already using port 1473:

      1. a = agent
      2. r = source
      3. c = channel
      4. k = sink
        a1.sources = r1 r2
        a1.channels = c1
        a1.sinks = k1
      5. ===SOURCES===
        a1.sources.r1.type = NETCAT
        a1.sources.r1.channels = c1
        a1.sources.r1.bind = localhost
        a1.sources.r1.port = 1473
        a1.sources.r2.type = AVRO
        a1.sources.r2.channels = c1
        a1.sources.r2.bind = localhost
        a1.sources.r2.port = 1473
      6. ===CHANNELS===
        a1.channels.c1.type = MEMORY
      7. ===SINKS===
        a1.sinks.k1.type = NULL
        a1.sinks.k1.channel = c1

      ...and updating the props file to the above config results in (after waiting a max of 30 secs for the reconfig to be noticed):
      2012-03-28 18:11:24,027 (conf-file-poller-0) [ERROR - org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcherRunnable.run(AbstractFileConfigurationProvider.java:205)] Failed to load configuration data. Exception follows.
      java.lang.NullPointerException
      at org.apache.flume.source.AvroSource.stop(AvroSource.java:137)
      at org.apache.flume.source.EventDrivenSourceRunner.stop(EventDrivenSourceRunner.java:45)
      at org.apache.flume.lifecycle.LifecycleSupervisor.unsupervise(LifecycleSupervisor.java:155)
      at org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.onNodeConfigurationChanged(DefaultLogicalNodeManager.java:66)
      at org.apache.flume.conf.properties.PropertiesFileConfigurationProvider.load(PropertiesFileConfigurationProvider.java:217)
      at org.apache.flume.conf.file.AbstractFileConfigurationProvider.doLoad(AbstractFileConfigurationProvider.java:124)
      at org.apache.flume.conf.file.AbstractFileConfigurationProvider.access$300(AbstractFileConfigurationProvider.java:38)
      at org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcherRunnable.run(AbstractFileConfigurationProvider.java:203)
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
      at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
      at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204)
      at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
      at java.lang.Thread.run(Thread.java:662)

      4) Now correct the config by changing r2's port to 1474:

      1. a = agent
      2. r = source
      3. c = channel
      4. k = sink
        a1.sources = r1 r2
        a1.channels = c1
        a1.sinks = k1
      5. ===SOURCES===
        a1.sources.r1.type = NETCAT
        a1.sources.r1.channels = c1
        a1.sources.r1.bind = localhost
        a1.sources.r1.port = 1473
        a1.sources.r2.type = AVRO
        a1.sources.r2.channels = c1
        a1.sources.r2.bind = localhost
        a1.sources.r2.port = 1474
      6. ===CHANNELS===
        a1.channels.c1.type = MEMORY
      7. ===SINKS===
        a1.sinks.k1.type = NULL
        a1.sinks.k1.channel = c1

      ...but this results in an illegal state:
      java.lang.IllegalStateException: Unaware of SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@5090d8ea counterGroup:{ name:null counters:

      {runner.backoffs.consecutive=5, runner.backoffs=5, runner.interruptions=1}

      } } - can not unsupervise
      at com.google.common.base.Preconditions.checkState(Preconditions.java:145)
      at org.apache.flume.lifecycle.LifecycleSupervisor.unsupervise(LifecycleSupervisor.java:145)
      at org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.onNodeConfigurationChanged(DefaultLogicalNodeManager.java:61)
      at org.apache.flume.conf.properties.PropertiesFileConfigurationProvider.load(PropertiesFileConfigurationProvider.java:217)
      at org.apache.flume.conf.file.AbstractFileConfigurationProvider.doLoad(AbstractFileConfigurationProvider.java:124)
      at org.apache.flume.conf.file.AbstractFileConfigurationProvider.access$300(AbstractFileConfigurationProvider.java:38)
      at org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcherRunnable.run(AbstractFileConfigurationProvider.java:203)
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
      at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
      at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204)
      at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
      at java.lang.Thread.run(Thread.java:662)

      ...which tells me that we've entered a permanent bad state that would require restarting the agent.

      5) Start the avro-client. We expect the avro-client to connect to the agent (if there would have been no errors in previous steps), but connection is refused:
      bin/flume-ng avro-client --cnf --host localhost --port 1474 --filename /home/will/bigdata.txt

      2012-03-28 18:27:35,650 (main) [ERROR - org.apache.flume.client.avro.AvroCLIClient.main(AvroCLIClient.java:72)] Unable to open connection to Flume. Exception follows.
      org.apache.flume.FlumeException: RPC connection error. Exception follows.
      at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:114)
      at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:96)
      at org.apache.flume.api.NettyAvroRpcClient.access$100(NettyAvroRpcClient.java:50)
      at org.apache.flume.api.NettyAvroRpcClient$Builder.build(NettyAvroRpcClient.java:389)
      at org.apache.flume.api.RpcClientFactory.getInstance(RpcClientFactory.java:45)
      at org.apache.flume.client.avro.AvroCLIClient.run(AvroCLIClient.java:120)
      at org.apache.flume.client.avro.AvroCLIClient.main(AvroCLIClient.java:64)
      Caused by: java.io.IOException: Error connecting to localhost/127.0.0.1:1474
      at org.apache.avro.ipc.NettyTransceiver.getChannel(NettyTransceiver.java:250)
      at org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:199)
      at org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:148)
      at org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:116)
      at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:107)
      ... 6 more
      Caused by: java.net.ConnectException: Connection refused
      at sun.nio.ch.SocketChannelImpl.$$YJP$$checkConnect(Native Method)
      at sun.nio.ch.SocketChannelImpl.checkConnect(SocketChannelImpl.java)
      at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:567)
      at org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink$Boss.connect(NioClientSocketPipelineSink.java:384)
      at org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink$Boss.processSelectedKeys(NioClientSocketPipelineSink.java:354)
      at org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink$Boss.run(NioClientSocketPipelineSink.java:276)
      at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
      at java.lang.Thread.run(Thread.java:662)
      2012-03-28 18:27:35,683 (main) [DEBUG - org.apache.flume.client.avro.AvroCLIClient.main(AvroCLIClient.java:77)] Exiting

      Attachments

        1. FLUME-1079-1.patch
          1 kB
          Hari Shreedharan
        2. FLUME-1079-2.patch
          2 kB
          Hari Shreedharan

        Activity

          People

            hshreedharan Hari Shreedharan
            will@cloudera.com Will McQueen
            Votes:
            0 Vote for this issue
            Watchers:
            0 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: