Uploaded image for project: 'Flume'
  1. Flume
  2. FLUME-418

Add batching and compression arguments to agent and collectors

    Details

    • Type: Improvement
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.9.4
    • Component/s: Node
    • Labels:
      None

      Description

      Currently it is difficult to enable batching and compression on e2e mode. Add some kw args to enable batching and compression on the agent side, and uncompress and unbatch on the collector side.

        Issue Links

          Activity

          Hide
          jmhsieh Jonathan Hsieh added a comment -

          Here's the new kw args for an agentSink, agentE2ESink, agentBESink and agentDFOSink.

          agent*Sink("collectorhost"[,port]

          {, batchN=size, batchLatency=ms, compression=false}

          )

          so by example
          agentSink("collectorhost", compression=true) // compression of individual events
          agentSink("collectorhost", compression=true, batchN=100) // compression of batches of 100 events
          agentSink("collectorhost", batchN=100, batchLatency=5000) // batches of 100 events or if 5000ms elapses
          agentSink("collectorhost", batchN=100, batchLatency=5000, compression=true) // compressed batches of 100 events or if 5000ms elapses

          Screen shots of this in action included. Going to file a follow on jira for the chains, and for the auto agents.

          Show
          jmhsieh Jonathan Hsieh added a comment - Here's the new kw args for an agentSink, agentE2ESink, agentBESink and agentDFOSink. agent*Sink("collectorhost" [,port] {, batchN=size, batchLatency=ms, compression=false} ) so by example agentSink("collectorhost", compression=true) // compression of individual events agentSink("collectorhost", compression=true, batchN=100) // compression of batches of 100 events agentSink("collectorhost", batchN=100, batchLatency=5000) // batches of 100 events or if 5000ms elapses agentSink("collectorhost", batchN=100, batchLatency=5000, compression=true) // compressed batches of 100 events or if 5000ms elapses Screen shots of this in action included. Going to file a follow on jira for the chains, and for the auto agents.
          Hide
          jmhsieh Jonathan Hsieh added a comment -

          attached images of agent and collector with batching and compression metrics displayed.

          Show
          jmhsieh Jonathan Hsieh added a comment - attached images of agent and collector with batching and compression metrics displayed.
          Hide
          jmhsieh Jonathan Hsieh added a comment - - edited

          First cut.

          Works properly for E2E,
          exec config node2 'exec("./test-endtoend/count-forever 1")' 'agentE2ESink("localhost",batchN=5,compression=true)'
          exec config node3 'collectorSource' 'collector(30000)

          { console("raw") }'

          BE gets stuck:
          exec config node2 'exec("./test-endtoend/count-forever 1")' 'agentBESink("localhost",batchN=5,compression=true)'
          exec config node3 'collectorSource' 'collector(30000) { console("raw") }

          '

          DFO (main path dies, but backup seems to succeed.):
          exec config node2 'exec("./test-endtoend/count-forever 1")' 'agentDFOSink("localhost",batchN=5,compression=true)'
          exec config node3 'collectorSource' 'collector(30000)

          { console("raw") }

          '

          Show
          jmhsieh Jonathan Hsieh added a comment - - edited First cut. Works properly for E2E, exec config node2 'exec("./test-endtoend/count-forever 1")' 'agentE2ESink("localhost",batchN=5,compression=true)' exec config node3 'collectorSource' 'collector(30000) { console("raw") }' BE gets stuck: exec config node2 'exec("./test-endtoend/count-forever 1")' 'agentBESink("localhost",batchN=5,compression=true)' exec config node3 'collectorSource' 'collector(30000) { console("raw") } ' DFO (main path dies, but backup seems to succeed.): exec config node2 'exec("./test-endtoend/count-forever 1")' 'agentDFOSink("localhost",batchN=5,compression=true)' exec config node3 'collectorSource' 'collector(30000) { console("raw") } '
          Hide
          jmhsieh Jonathan Hsieh added a comment -
          Show
          jmhsieh Jonathan Hsieh added a comment - review here: https://review.cloudera.org/r/1709/
          Hide
          jmhsieh Jonathan Hsieh added a comment -

          final version.

          Show
          jmhsieh Jonathan Hsieh added a comment - final version.
          Hide
          jmhsieh Jonathan Hsieh added a comment -

          committed

          Show
          jmhsieh Jonathan Hsieh added a comment - committed

            People

            • Assignee:
              jmhsieh Jonathan Hsieh
              Reporter:
              jmhsieh Jonathan Hsieh
            • Votes:
              1 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development