Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Cannot Reproduce
-
1.9.0
-
None
-
None
Description
Using `org.apache.flume.agent.embedded.EmbeddedAgent`.
{{ Configuration as such:}}
Map<String, String> configurationProperties = ...; service.configure(configurationProperties);
Where `configurationProperties` is set with:
{ "kafkaSink.kafka.producer.reconnect.backoff.max.ms": "30000", "processor.type": "load_balance", "sinks": "kafkaSink1", "channel.keep-alive": "0", "channel.checkpointDir": "********************************", "kafkaSink.kafka.producer.reconnect.backoff.ms": "2000", "channel.dataDirs": "********************************", "kafkaSink.kafka.producer.retry.backoff.ms": "1000", "processor.selector.maxTimeOut": "60000", "kafkaSink.kafka.producer.max.request.size": "5485760", "kafkaSink1.flumeBatchSize": "1000", "kafkaSink.kafka.producer.buffer.memory": "67108864", "kafkaSink.kafka.producer.client.id": "********************************", "kafkaSink1.useFlumeEventFormat": "true", "kafkaSink1.kafka.topic": "********************************", "kafkaSink.kafka.producer.batch.size": "8196", "channel.kafka.dataDirs": "********************************", "kafkaSink1.type": "KAFKA", "channel.backupCheckpointDir": "********************************", "kafkaSink1.allowTopicOverride": "true", "channel.useDualCheckpoints": "true", "kafkaSink.kafka.producer.compression.type": "lz4", "processor.maxBackoff": "60000", "use_dual_channel": "true", "channel.capacity": "1000000", "channel.byteCapacityBufferPercentage": "50", "channel.transactionCapacity": "1000", "channel.byteCapacity": "10485760", "channel.type": "file", "processor.backoff": "true", "channel.kafka.checkpointDir": "********************************", "channel.kafka.backupCheckpointDir": "********************************", "kafkaSink1.kafka.bootstrap.servers": "********************************", "kafkaSink.kafka.producer.acks": "-1" }
At runtime it throw the following:
java.lang.NullPointerException at org.apache.flume.conf.sink.SinkGroupConfiguration.configure(SinkGroupConfiguration.java:52) at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateGroups(FlumeConfiguration.java:927) at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:384) at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.access$000(FlumeConfiguration.java:228) at org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:153) at org.apache.flume.conf.FlumeConfiguration.<init>(FlumeConfiguration.java:133) at org.apache.flume.agent.embedded.MemoryConfigurationProvider.getFlumeConfiguration(MemoryConfigurationProvider.java:45) at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:97) at org.apache.flume.agent.embedded.MaterializedConfigurationProvider.get(MaterializedConfigurationProvider.java:40) at org.apache.flume.agent.embedded.EmbeddedAgent.doConfigure(EmbeddedAgent.java:161) at org.apache.flume.agent.embedded.EmbeddedAgent.configure(EmbeddedAgent.java:99) at *****************.startService(*****************)
And flume will not start whatsoever.
The code for `org.apache.flume.conf.sink.SinkGroupConfiguration.configure(SinkGroupConfiguration.java:52` shows it is looking for property `sinks` which is not empty so it shouldn't throw any such error...
Anyone knows why? No documentation about any of that....