Uploaded image for project: 'Flume'
  1. Flume
  2. FLUME-2718

HTTP Source to support generic Stream Handler

    Details

    • Type: Improvement
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.7.0
    • Component/s: Sinks+Sources
    • Labels:
      None
    • Flags:
      Patch

      Description

      Currently the HTTP Source supports JSONHandler as the default implementation. A more generic approach will be having a BLOBHandler which accepts any request input stream (that loads the stream as Event payload). Furthermore, this Handler lets you define mandatory request parameters and maps those parameters into Event Headers.

      This way HTTPSource can be used as a generic Data Ingress endpoint for any sink, where one can specify attributes run like basepath, filename & timestamp as request parameters and access those values via HEADER values in sink properties.

      All this can be done without developing any custom Handler code.

      For e.g.

      With the below agent configuration, you can send any type of data (JSON/CSV/TSV) and store it in any sink, HDFS in this case.

      sample command
      curl -v -X POST "http://testHost:8080/?basepath=/data/&filename=test.json&timestamp=1434101498275" --data @test.json
      
      HDFS data path
      /data/2015/06/12/test.json.1434101498275.lzo
      
      agent.conf
      #Agent configuration
      #HTTP Source configuration
      agent.sources = httpSrc
      agent.channels = memChannel
      agent.sources.httpSrc.type = http
      agent.sources.httpSrc.channels = memChannel
      agent.sources.httpSrc.bind = testHost
      agent.sources.httpSrc.port = 8080
      agent.sources.httpSrc.handler = org.apache.flume.source.http.BLOBHandler
      agent.sources.httpSrc.handler.mandatoryParameters = basepath, filename
      
      #Memory channel with default configuration
      agent.channels.memChannel.type = memory
      agent.channels.memChannel.capacity = 100000
      agent.channels.memChannel.transactionCapacity = 1000
      
      #HDFS Sink configuration
      agent.sinks.hdfsSink.type = hdfs
      agent.sinks.hdfsSink.hdfs.path = %{basepath}/%Y/%m/%d
      agent.sinks.hdfsSink.hdfs.useLocalTimeStamp = true
      agent.sinks.hdfsSink.hdfs.filePrefix = %{filename}
      agent.sinks.hdfsSink.hdfs.fileType = CompressedStream
      agent.sinks.hdfsSink.hdfs.codeC = lzop
      agent.sinks.hdfsSink.channel = memChannel
      
      #Finally, activate.
      agent.channels = memChannel
      agent.sources = httpSrc
      agent.sinks = hdfsSink
      

        Attachments

          Activity

            People

            • Assignee:
              hariprasad kuppuswamy Hari
              Reporter:
              hariprasad kuppuswamy Hari
            • Votes:
              1 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: