Flume
  1. Flume
  2. FLUME-2440

ElasticSearchIndexRequestBuilderFactory

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Major Major
    • Resolution: Not A Problem
    • Affects Version/s: v1.5.0
    • Fix Version/s: None
    • Component/s: Sinks+Sources
    • Labels:
    • Environment:

      centos CDH

    • Release Note:
      .
    • Cloudstack-CVSS:
      10

      Description

      I want sink the data to ElasticSearch with ElasticSearchIndexRequestBuilderFactory option, according to the document :
      "The ElasticSearchIndexRequestBuilderFactory or ElasticSearchEventSerializer to use. Implementations of either class are accepted but ElasticSearchIndexRequestBuilderFactory is preferred."

      but when start flume , error occurs. after check the source code, find the ElasticSearchIndexRequestBuilderFactory is just a interface.

      my flume configure file as following:
      agent1.sources=source1
      agent1.sinks=sink1
      agent1.channels=channel1

      agent1.sources.source1.type=spooldir
      agent1.sources.source1.spoolDir=/root/flume_test/spl
      agent1.sources.source1.channels=channel1
      agent1.sources.source1.fileHeader = false

      agent1.sinks.sink1.channel=channel1
      agent1.sinks.sink1.type=org.apache.flume.sink.elasticsearch.ElasticSearchSink
      #agent1.sinks.sink1.type=elasticsearch
      agent1.sinks.sink1.hostNames=192.168.106.27:9300
      agent1.sinks.sink1.serializer=org.apache.flume.sink.elasticsearch.ElasticSearchIndexRequestBuilderFactory
      #agent1.sinks.sink1.serializer=org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer

      agent1.channels.channel1.type=file
      agent1.channels.channel1.checkpointDir=/root/flume_test/ck
      agent1.channels.channel1.dataDirs=/root/flume_test/data

      the error is :

      14/08/04 10:04:38 INFO source.DefaultSourceFactory: Creating instance of source source1, type spooldir
      14/08/04 10:04:38 INFO sink.DefaultSinkFactory: Creating instance of sink: sink1, type: org.apache.flume.sink.elasticsearch.ElasticSearchSink
      14/08/04 10:04:38 ERROR elasticsearch.ElasticSearchSink: Could not instantiate event serializer.
      java.lang.InstantiationException: org.apache.flume.sink.elasticsearch.ElasticSearchIndexRequestBuilderFactory
      at java.lang.Class.newInstance(Class.java:359)
      at org.apache.flume.sink.elasticsearch.ElasticSearchSink.configure(ElasticSearchSink.java:295)
      at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
      at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:418)
      at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:103)
      at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
      at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
      at java.lang.Thread.run(Thread.java:744)
      14/08/04 10:04:38 ERROR node.AbstractConfigurationProvider: Sink sink1 has been removed due to an error during configuration
      java.lang.RuntimeException: java.lang.InstantiationException: org.apache.flume.sink.elasticsearch.ElasticSearchIndexRequestBuilderFactory
      at com.google.common.base.Throwables.propagate(Throwables.java:156)
      at org.apache.flume.sink.elasticsearch.ElasticSearchSink.configure(ElasticSearchSink.java:309)
      at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
      at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:418)
      at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:103)
      at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
      at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
      at java.lang.Thread.run(Thread.java:744)
      Caused by: java.lang.InstantiationException: org.apache.flume.sink.elasticsearch.ElasticSearchIndexRequestBuilderFactory
      at java.lang.Class.newInstance(Class.java:359)
      at org.apache.flume.sink.elasticsearch.ElasticSearchSink.configure(ElasticSearchSink.java:295)

      pls help me !!!

      thanks

        Activity

        tu gongxuan created issue -
        Hide
        tu gongxuan added a comment -

        sorry for mistake , the flume-ng version is 1.4.0

        Show
        tu gongxuan added a comment - sorry for mistake , the flume-ng version is 1.4.0
        tu gongxuan made changes -
        Field Original Value New Value
        Original Estimate 168h [ 604800 ] 72h [ 259200 ]
        Remaining Estimate 168h [ 604800 ] 72h [ 259200 ]
        tu gongxuan made changes -
        Original Estimate 72h [ 259200 ] 36h [ 129600 ]
        Remaining Estimate 72h [ 259200 ] 36h [ 129600 ]
        Hide
        Hari Shreedharan added a comment -

        You need to implement the interface and put the implementation's FQCN in the classpath.

        Show
        Hari Shreedharan added a comment - You need to implement the interface and put the implementation's FQCN in the classpath.
        Hari Shreedharan made changes -
        Status Open [ 1 ] Resolved [ 5 ]
        Resolution Not a Problem [ 8 ]
        Hide
        tu gongxuan added a comment -

        why ElasticSearchEventSerializer is implemented in flume , but ElasticSearchIndexRequestBuilderFactory not?

        Show
        tu gongxuan added a comment - why ElasticSearchEventSerializer is implemented in flume , but ElasticSearchIndexRequestBuilderFactory not?
        tu gongxuan made changes -
        Status Resolved [ 5 ] Closed [ 6 ]
        Hide
        Edward Sargisson added a comment -

        tu gongxuan Because they're a place where you can choose to add a customisation with your own code if that is what you need.

        Show
        Edward Sargisson added a comment - tu gongxuan Because they're a place where you can choose to add a customisation with your own code if that is what you need.
        Transition Time In Source Status Execution Times Last Executer Last Execution Date
        Open Open Resolved Resolved
        15h 58m 1 Hari Shreedharan 04/Aug/14 19:18
        Resolved Resolved Closed Closed
        2d 15h 40m 1 tu gongxuan 07/Aug/14 10:59

          People

          • Assignee:
            Unassigned
            Reporter:
            tu gongxuan
          • Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Time Tracking

              Estimated:
              Original Estimate - 36h
              36h
              Remaining:
              Remaining Estimate - 36h
              36h
              Logged:
              Time Spent - Not Specified
              Not Specified

                Development