Uploaded image for project: 'Flume'
  1. Flume
  2. FLUME-3378

Apache Flume Java client fails to start with a single Kafka sink

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.9.0
    • None
    • Configuration
    • None

    Description

      Using `org.apache.flume.agent.embedded.EmbeddedAgent`.
      {{ Configuration as such:}}

      Map<String, String> configurationProperties = ...;
       service.configure(configurationProperties);

      Where `configurationProperties` is set with:

      { "kafkaSink.kafka.producer.reconnect.backoff.max.ms": "30000", "processor.type": "load_balance", "sinks": "kafkaSink1", "channel.keep-alive": "0", "channel.checkpointDir": "********************************", "kafkaSink.kafka.producer.reconnect.backoff.ms": "2000", "channel.dataDirs": "********************************", "kafkaSink.kafka.producer.retry.backoff.ms": "1000", "processor.selector.maxTimeOut": "60000", "kafkaSink.kafka.producer.max.request.size": "5485760", "kafkaSink1.flumeBatchSize": "1000", "kafkaSink.kafka.producer.buffer.memory": "67108864", "kafkaSink.kafka.producer.client.id": "********************************", "kafkaSink1.useFlumeEventFormat": "true", "kafkaSink1.kafka.topic": "********************************", "kafkaSink.kafka.producer.batch.size": "8196", "channel.kafka.dataDirs": "********************************", "kafkaSink1.type": "KAFKA", "channel.backupCheckpointDir": "********************************", "kafkaSink1.allowTopicOverride": "true", "channel.useDualCheckpoints": "true", "kafkaSink.kafka.producer.compression.type": "lz4", "processor.maxBackoff": "60000", "use_dual_channel": "true", "channel.capacity": "1000000", "channel.byteCapacityBufferPercentage": "50", "channel.transactionCapacity": "1000", "channel.byteCapacity": "10485760", "channel.type": "file", "processor.backoff": "true", "channel.kafka.checkpointDir": "********************************", "channel.kafka.backupCheckpointDir": "********************************", "kafkaSink1.kafka.bootstrap.servers": "********************************", "kafkaSink.kafka.producer.acks": "-1" }

      At runtime it throw the following:

      java.lang.NullPointerException
       at org.apache.flume.conf.sink.SinkGroupConfiguration.configure(SinkGroupConfiguration.java:52)
       at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateGroups(FlumeConfiguration.java:927)
       at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:384)
       at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.access$000(FlumeConfiguration.java:228)
       at org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:153)
       at org.apache.flume.conf.FlumeConfiguration.<init>(FlumeConfiguration.java:133)
       at org.apache.flume.agent.embedded.MemoryConfigurationProvider.getFlumeConfiguration(MemoryConfigurationProvider.java:45)
       at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:97)
       at org.apache.flume.agent.embedded.MaterializedConfigurationProvider.get(MaterializedConfigurationProvider.java:40)
       at org.apache.flume.agent.embedded.EmbeddedAgent.doConfigure(EmbeddedAgent.java:161)
       at org.apache.flume.agent.embedded.EmbeddedAgent.configure(EmbeddedAgent.java:99)
       at *****************.startService(*****************)

      And flume will not start whatsoever.

      The code for `org.apache.flume.conf.sink.SinkGroupConfiguration.configure(SinkGroupConfiguration.java:52` shows it is looking for property `sinks` which is not empty so it shouldn't throw any such error...

      Anyone knows why? No documentation about any of that....

      Attachments

        Activity

          People

            Unassigned Unassigned
            srfrnk Shahar Frank
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: