Details
Description
Using flume trunk, commit ad24cb31bb1b5a0d1ee4b0ec18572a223ed9d397
Steps:
1) Start with this config in a1.properties:
- a = agent
- r = source
- c = channel
- k = sink
a1.sources = r1
a1.channels = c1
a1.sinks = k1 - ===SOURCES===
a1.sources.r1.type = NETCAT
a1.sources.r1.channels = c1
a1.sources.r1.bind = localhost
a1.sources.r1.port = 1473 - ===CHANNELS===
a1.channels.c1.type = MEMORY - ===SINKS===
a1.sinks.k1.type = NULL
a1.sinks.k1.channel = c1
2) Run the flume node:
bin/flume-ng node --conf conf --conf-file conf/a1.properties --name a1
3) Update the a1.properties file to add a new source a the same port, which would cause a port bind exception on r2 due to r1 already using port 1473:
- a = agent
- r = source
- c = channel
- k = sink
a1.sources = r1 r2
a1.channels = c1
a1.sinks = k1 - ===SOURCES===
a1.sources.r1.type = NETCAT
a1.sources.r1.channels = c1
a1.sources.r1.bind = localhost
a1.sources.r1.port = 1473
a1.sources.r2.type = AVRO
a1.sources.r2.channels = c1
a1.sources.r2.bind = localhost
a1.sources.r2.port = 1473 - ===CHANNELS===
a1.channels.c1.type = MEMORY - ===SINKS===
a1.sinks.k1.type = NULL
a1.sinks.k1.channel = c1
...and updating the props file to the above config results in (after waiting a max of 30 secs for the reconfig to be noticed):
2012-03-28 18:11:24,027 (conf-file-poller-0) [ERROR - org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcherRunnable.run(AbstractFileConfigurationProvider.java:205)] Failed to load configuration data. Exception follows.
java.lang.NullPointerException
at org.apache.flume.source.AvroSource.stop(AvroSource.java:137)
at org.apache.flume.source.EventDrivenSourceRunner.stop(EventDrivenSourceRunner.java:45)
at org.apache.flume.lifecycle.LifecycleSupervisor.unsupervise(LifecycleSupervisor.java:155)
at org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.onNodeConfigurationChanged(DefaultLogicalNodeManager.java:66)
at org.apache.flume.conf.properties.PropertiesFileConfigurationProvider.load(PropertiesFileConfigurationProvider.java:217)
at org.apache.flume.conf.file.AbstractFileConfigurationProvider.doLoad(AbstractFileConfigurationProvider.java:124)
at org.apache.flume.conf.file.AbstractFileConfigurationProvider.access$300(AbstractFileConfigurationProvider.java:38)
at org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcherRunnable.run(AbstractFileConfigurationProvider.java:203)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
4) Now correct the config by changing r2's port to 1474:
- a = agent
- r = source
- c = channel
- k = sink
a1.sources = r1 r2
a1.channels = c1
a1.sinks = k1 - ===SOURCES===
a1.sources.r1.type = NETCAT
a1.sources.r1.channels = c1
a1.sources.r1.bind = localhost
a1.sources.r1.port = 1473
a1.sources.r2.type = AVRO
a1.sources.r2.channels = c1
a1.sources.r2.bind = localhost
a1.sources.r2.port = 1474 - ===CHANNELS===
a1.channels.c1.type = MEMORY - ===SINKS===
a1.sinks.k1.type = NULL
a1.sinks.k1.channel = c1
...but this results in an illegal state:
java.lang.IllegalStateException: Unaware of SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@5090d8ea counterGroup:{ name:null counters:
} } - can not unsupervise
at com.google.common.base.Preconditions.checkState(Preconditions.java:145)
at org.apache.flume.lifecycle.LifecycleSupervisor.unsupervise(LifecycleSupervisor.java:145)
at org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.onNodeConfigurationChanged(DefaultLogicalNodeManager.java:61)
at org.apache.flume.conf.properties.PropertiesFileConfigurationProvider.load(PropertiesFileConfigurationProvider.java:217)
at org.apache.flume.conf.file.AbstractFileConfigurationProvider.doLoad(AbstractFileConfigurationProvider.java:124)
at org.apache.flume.conf.file.AbstractFileConfigurationProvider.access$300(AbstractFileConfigurationProvider.java:38)
at org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcherRunnable.run(AbstractFileConfigurationProvider.java:203)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
...which tells me that we've entered a permanent bad state that would require restarting the agent.
5) Start the avro-client. We expect the avro-client to connect to the agent (if there would have been no errors in previous steps), but connection is refused:
bin/flume-ng avro-client --cnf --host localhost --port 1474 --filename /home/will/bigdata.txt
2012-03-28 18:27:35,650 (main) [ERROR - org.apache.flume.client.avro.AvroCLIClient.main(AvroCLIClient.java:72)] Unable to open connection to Flume. Exception follows.
org.apache.flume.FlumeException: RPC connection error. Exception follows.
at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:114)
at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:96)
at org.apache.flume.api.NettyAvroRpcClient.access$100(NettyAvroRpcClient.java:50)
at org.apache.flume.api.NettyAvroRpcClient$Builder.build(NettyAvroRpcClient.java:389)
at org.apache.flume.api.RpcClientFactory.getInstance(RpcClientFactory.java:45)
at org.apache.flume.client.avro.AvroCLIClient.run(AvroCLIClient.java:120)
at org.apache.flume.client.avro.AvroCLIClient.main(AvroCLIClient.java:64)
Caused by: java.io.IOException: Error connecting to localhost/127.0.0.1:1474
at org.apache.avro.ipc.NettyTransceiver.getChannel(NettyTransceiver.java:250)
at org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:199)
at org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:148)
at org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:116)
at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:107)
... 6 more
Caused by: java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.$$YJP$$checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.checkConnect(SocketChannelImpl.java)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:567)
at org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink$Boss.connect(NioClientSocketPipelineSink.java:384)
at org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink$Boss.processSelectedKeys(NioClientSocketPipelineSink.java:354)
at org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink$Boss.run(NioClientSocketPipelineSink.java:276)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
2012-03-28 18:27:35,683 (main) [DEBUG - org.apache.flume.client.avro.AvroCLIClient.main(AvroCLIClient.java:77)] Exiting