Flume
  1. Flume
  2. FLUME-2069

Issue with Flume load balancing round robin

    Details

    • Type: Bug Bug
    • Status: Open
    • Priority: Blocker Blocker
    • Resolution: Unresolved
    • Affects Version/s: v1.3.1
    • Fix Version/s: None
    • Component/s: Sinks+Sources
    • Labels:
      None

      Description

      I am not sure if this is a bug. I have an http to hdfs scenario, 3 channels with 3 hdfs sinks, I have configured it to do load balancing with round robin, but when the request arrive it gets replicated into all the sinks instead of hitting the sink with round robin order, so I end up with the same data replicated by all sinks.

      here are my configs, I have not included the source configs here

      a1.sinks.k1.type = hdfs
      a1.sinks.k1.hdfs.path = /tmp/hadoop-oawad/dfs/name2/%y-%m-%d/%H%M/%S
      a1.sinks.k1.hdfs.fileType = DataStream
      a1.sinks.k1.hdfs.writeFormat = Text
      a1.sinks.k1.hdfs.filePrefix = events
      a1.sinks.k1.hdfs.batchSize = 1000

      a1.sinks.k2.type = hdfs
      a1.sinks.k2.hdfs.path = /tmp/hadoop-oawad/dfs/name2/%y-%m-%d/%H%M/%S
      a1.sinks.k2.hdfs.fileType = DataStream
      a1.sinks.k2.hdfs.writeFormat = Text
      a1.sinks.k2.hdfs.filePrefix = events
      a1.sinks.k2.hdfs.batchSize = 1000

      a1.sinks.k3.type = hdfs
      a1.sinks.k3.hdfs.path = /tmp/hadoop-oawad/dfs/name2/%y-%m-%d/%H%M/%S
      a1.sinks.k3.hdfs.fileType = DataStream
      a1.sinks.k3.hdfs.writeFormat = Text
      a1.sinks.k3.hdfs.filePrefix = events
      a1.sinks.k3.hdfs.batchSize = 1000

      a1.sinkgroups = g1
      a1.sinkgroups.g1.sinks = k1 k2 k3
      a1.sinkgroups.g1.processor.type = load_balance
      a1.sinkgroups.g1.processor.selector = round_robin

      am I missing something here?

        Activity

        Hide
        Hari Shreedharan added a comment -

        That is just a timeout, increasing the timeout will fix this issue. You should have each sink write to a different directory or have different prefixes.

        Show
        Hari Shreedharan added a comment - That is just a timeout, increasing the timeout will fix this issue. You should have each sink write to a different directory or have different prefixes.
        Hide
        Osama Awad added a comment -

        after changing the path on hdfs for each sink to be different it worked and I can see the data split, it seems that having multiple sinks to the same hdfs path will not work, right ?

        Show
        Osama Awad added a comment - after changing the path on hdfs for each sink to be different it worked and I can see the data split, it seems that having multiple sinks to the same hdfs path will not work, right ?
        Hide
        Osama Awad added a comment - - edited

        I have modified the file like this, but getting errors when the request arrives

        13/06/06 13:32:09 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
        13/06/06 13:32:09 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
        13/06/06 13:32:09 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
        13/06/06 13:32:09 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
        13/06/06 13:32:09 INFO hdfs.BucketWriter: Creating /tmp/hadoop-oawad/dfs/name2/13-06-06/1330/00/events.1370543529674.tmp
        13/06/06 13:32:09 INFO hdfs.BucketWriter: Creating /tmp/hadoop-oawad/dfs/name2/13-06-06/1330/00/events.1370543529674.tmp
        13/06/06 13:32:09 INFO hdfs.BucketWriter: Creating /tmp/hadoop-oawad/dfs/name2/13-06-06/1330/00/events.1370543529674.tmp
        13/06/06 13:32:09 INFO hdfs.BucketWriter: Creating /tmp/hadoop-oawad/dfs/name2/13-06-06/1330/00/events.1370543529674.tmp
        13/06/06 13:32:19 WARN hdfs.HDFSEventSink: HDFS IO error
        java.io.IOException: Callable timed out after 10000 ms
        at org.apache.flume.sink.hdfs.HDFSEventSink.callWithTimeout(HDFSEventSink.java:346)
        at org.apache.flume.sink.hdfs.HDFSEventSink.append(HDFSEventSink.java:727)
        at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:427)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:722)
        Caused by: java.util.concurrent.TimeoutException
        at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:258)
        at java.util.concurrent.FutureTask.get(FutureTask.java:119)
        at org.apache.flume.sink.hdfs.HDFSEventSink.callWithTimeout(HDFSEventSink.java:339)
        ... 5 more

        a1.sources = r1
        a1.sinks = k1 k2 k3 k4
        a1.channels = c1

        a1.sources.r1.type = org.apache.flume.source.http.HTTPSource
        a1.sources.r1.port = 5140
        a1.sources.r1.channels = c1
        a1.sources.r1.handler = org.apache.flume.source.http.JSONHandler

        a1.sources.r1.handler.nickname = random props
        a1.sources.r1.interceptors = timestamp logging
        a1.sources.r1.interceptors.logging.type = com.xyz.flume.interceptors.LoggingInterceptor$Builder
        a1.sources.r1.interceptors.timestamp.type = org.apache.flume.interceptor.TimestampInterceptor$Builder

        a1.sinks.k1.type = hdfs
        a1.sinks.k1.hdfs.path = /tmp/hadoop-oawad/dfs/name2/%y-%m-%d/%H%M/%S
        a1.sinks.k1.hdfs.fileType = DataStream
        a1.sinks.k1.hdfs.writeFormat = Text
        a1.sinks.k1.hdfs.filePrefix = events
        a1.sinks.k1.hdfs.batchSize = 1000
        a1.sinks.k1.hdfs.round = true
        a1.sinks.k1.hdfs.roundValue = 10
        a1.sinks.k1.hdfs.roundUnit = minute
        a1.sinks.k1.hdfs.threadsPoolSize = 100
        a1.sinks.k1.hdfs.rollSize = 6144
        a1.sinks.k1.hdfs.rollCount = 20

        a1.sinks.k2.type = hdfs
        a1.sinks.k2.hdfs.path = /tmp/hadoop-oawad/dfs/name2/%y-%m-%d/%H%M/%S
        a1.sinks.k2.hdfs.fileType = DataStream
        a1.sinks.k2.hdfs.writeFormat = Text
        a1.sinks.k2.hdfs.filePrefix = events
        a1.sinks.k2.hdfs.batchSize = 1000
        a1.sinks.k2.hdfs.round = true
        a1.sinks.k2.hdfs.roundValue = 10
        a1.sinks.k2.hdfs.roundUnit = minute
        a1.sinks.k2.hdfs.threadsPoolSize = 100
        a1.sinks.k2.hdfs.rollSize = 6144
        a1.sinks.k2.hdfs.rollCount = 20

        a1.sinks.k3.type = hdfs
        a1.sinks.k3.hdfs.path = /tmp/hadoop-oawad/dfs/name2/%y-%m-%d/%H%M/%S
        a1.sinks.k3.hdfs.fileType = DataStream
        a1.sinks.k3.hdfs.writeFormat = Text
        a1.sinks.k3.hdfs.filePrefix = events
        a1.sinks.k3.hdfs.batchSize = 1000
        a1.sinks.k3.hdfs.round = true
        a1.sinks.k3.hdfs.roundValue = 10
        a1.sinks.k3.hdfs.roundUnit = minute
        a1.sinks.k3.hdfs.threadsPoolSize = 100
        a1.sinks.k3.hdfs.rollSize = 6144
        a1.sinks.k3.hdfs.rollCount = 20

        a1.sinks.k4.type = hdfs
        a1.sinks.k4.hdfs.path = /tmp/hadoop-oawad/dfs/name2/%y-%m-%d/%H%M/%S
        a1.sinks.k4.hdfs.fileType = DataStream
        a1.sinks.k4.hdfs.writeFormat = Text
        a1.sinks.k4.hdfs.filePrefix = events
        a1.sinks.k4.hdfs.batchSize = 1000
        a1.sinks.k4.hdfs.round = true
        a1.sinks.k4.hdfs.roundValue = 10
        a1.sinks.k4.hdfs.roundUnit = minute
        a1.sinks.k4.hdfs.threadsPoolSize = 100
        a1.sinks.k4.hdfs.rollSize = 6144
        a1.sinks.k4.hdfs.rollCount = 20

        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 1000000
        a1.channels.c1.transactionCapacity = 1000

        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1
        a1.sinks.k2.channel = c1
        a1.sinks.k3.channel = c1
        a1.sinks.k4.channel = c1

        Show
        Osama Awad added a comment - - edited I have modified the file like this, but getting errors when the request arrives 13/06/06 13:32:09 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false 13/06/06 13:32:09 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false 13/06/06 13:32:09 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false 13/06/06 13:32:09 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false 13/06/06 13:32:09 INFO hdfs.BucketWriter: Creating /tmp/hadoop-oawad/dfs/name2/13-06-06/1330/00/events.1370543529674.tmp 13/06/06 13:32:09 INFO hdfs.BucketWriter: Creating /tmp/hadoop-oawad/dfs/name2/13-06-06/1330/00/events.1370543529674.tmp 13/06/06 13:32:09 INFO hdfs.BucketWriter: Creating /tmp/hadoop-oawad/dfs/name2/13-06-06/1330/00/events.1370543529674.tmp 13/06/06 13:32:09 INFO hdfs.BucketWriter: Creating /tmp/hadoop-oawad/dfs/name2/13-06-06/1330/00/events.1370543529674.tmp 13/06/06 13:32:19 WARN hdfs.HDFSEventSink: HDFS IO error java.io.IOException: Callable timed out after 10000 ms at org.apache.flume.sink.hdfs.HDFSEventSink.callWithTimeout(HDFSEventSink.java:346) at org.apache.flume.sink.hdfs.HDFSEventSink.append(HDFSEventSink.java:727) at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:427) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) at java.lang.Thread.run(Thread.java:722) Caused by: java.util.concurrent.TimeoutException at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:258) at java.util.concurrent.FutureTask.get(FutureTask.java:119) at org.apache.flume.sink.hdfs.HDFSEventSink.callWithTimeout(HDFSEventSink.java:339) ... 5 more a1.sources = r1 a1.sinks = k1 k2 k3 k4 a1.channels = c1 a1.sources.r1.type = org.apache.flume.source.http.HTTPSource a1.sources.r1.port = 5140 a1.sources.r1.channels = c1 a1.sources.r1.handler = org.apache.flume.source.http.JSONHandler a1.sources.r1.handler.nickname = random props a1.sources.r1.interceptors = timestamp logging a1.sources.r1.interceptors.logging.type = com.xyz.flume.interceptors.LoggingInterceptor$Builder a1.sources.r1.interceptors.timestamp.type = org.apache.flume.interceptor.TimestampInterceptor$Builder a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /tmp/hadoop-oawad/dfs/name2/%y-%m-%d/%H%M/%S a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.writeFormat = Text a1.sinks.k1.hdfs.filePrefix = events a1.sinks.k1.hdfs.batchSize = 1000 a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = minute a1.sinks.k1.hdfs.threadsPoolSize = 100 a1.sinks.k1.hdfs.rollSize = 6144 a1.sinks.k1.hdfs.rollCount = 20 a1.sinks.k2.type = hdfs a1.sinks.k2.hdfs.path = /tmp/hadoop-oawad/dfs/name2/%y-%m-%d/%H%M/%S a1.sinks.k2.hdfs.fileType = DataStream a1.sinks.k2.hdfs.writeFormat = Text a1.sinks.k2.hdfs.filePrefix = events a1.sinks.k2.hdfs.batchSize = 1000 a1.sinks.k2.hdfs.round = true a1.sinks.k2.hdfs.roundValue = 10 a1.sinks.k2.hdfs.roundUnit = minute a1.sinks.k2.hdfs.threadsPoolSize = 100 a1.sinks.k2.hdfs.rollSize = 6144 a1.sinks.k2.hdfs.rollCount = 20 a1.sinks.k3.type = hdfs a1.sinks.k3.hdfs.path = /tmp/hadoop-oawad/dfs/name2/%y-%m-%d/%H%M/%S a1.sinks.k3.hdfs.fileType = DataStream a1.sinks.k3.hdfs.writeFormat = Text a1.sinks.k3.hdfs.filePrefix = events a1.sinks.k3.hdfs.batchSize = 1000 a1.sinks.k3.hdfs.round = true a1.sinks.k3.hdfs.roundValue = 10 a1.sinks.k3.hdfs.roundUnit = minute a1.sinks.k3.hdfs.threadsPoolSize = 100 a1.sinks.k3.hdfs.rollSize = 6144 a1.sinks.k3.hdfs.rollCount = 20 a1.sinks.k4.type = hdfs a1.sinks.k4.hdfs.path = /tmp/hadoop-oawad/dfs/name2/%y-%m-%d/%H%M/%S a1.sinks.k4.hdfs.fileType = DataStream a1.sinks.k4.hdfs.writeFormat = Text a1.sinks.k4.hdfs.filePrefix = events a1.sinks.k4.hdfs.batchSize = 1000 a1.sinks.k4.hdfs.round = true a1.sinks.k4.hdfs.roundValue = 10 a1.sinks.k4.hdfs.roundUnit = minute a1.sinks.k4.hdfs.threadsPoolSize = 100 a1.sinks.k4.hdfs.rollSize = 6144 a1.sinks.k4.hdfs.rollCount = 20 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000000 a1.channels.c1.transactionCapacity = 1000 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c1 a1.sinks.k3.channel = c1 a1.sinks.k4.channel = c1
        Hide
        Hari Shreedharan added a comment -

        That is meant to load balance between several different hosts on the next tier - usually for RPC calls, to reduce the load on any one machine and to make sure that data is sent to all machines in the next tier.

        Show
        Hari Shreedharan added a comment - That is meant to load balance between several different hosts on the next tier - usually for RPC calls, to reduce the load on any one machine and to make sure that data is sent to all machines in the next tier.
        Hide
        Osama Awad added a comment -

        I was thinking about having 4 channels with a sink polling from each to scale the load in flume. so, if I have one channels with 4 sinks polling from it, then what is

        a1.sinkgroups = g1
        a1.sinkgroups.g1.sinks = k1 k2 k3 k4
        a1.sinkgroups.g1.processor.type = load_balance
        a1.sinkgroups.g1.processor.selector = round_robin

        used for ?

        Show
        Osama Awad added a comment - I was thinking about having 4 channels with a sink polling from each to scale the load in flume. so, if I have one channels with 4 sinks polling from it, then what is a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 k3 k4 a1.sinkgroups.g1.processor.type = load_balance a1.sinkgroups.g1.processor.selector = round_robin used for ?
        Hide
        Hari Shreedharan added a comment -

        You can simply use the same channel for all 4 sinks. So the sinks will pick up from the same channel - and each sink will get different events. You don't need a sink group if you are using the sinks in this fashion (all 4 will run simultaneously - if you use a sink group only one sink will read at a time. Depending on which you want to do, add or remove the sink group).

        Show
        Hari Shreedharan added a comment - You can simply use the same channel for all 4 sinks. So the sinks will pick up from the same channel - and each sink will get different events. You don't need a sink group if you are using the sinks in this fashion (all 4 will run simultaneously - if you use a sink group only one sink will read at a time. Depending on which you want to do, add or remove the sink group).
        Hide
        Osama Awad added a comment -

        I understand this, I am not trying to multiplex based on header values, I am trying to load balance requests in a round robin fashion, so is that supported ?

        Show
        Osama Awad added a comment - I understand this, I am not trying to multiplex based on header values, I am trying to load balance requests in a round robin fashion, so is that supported ?
        Hide
        Hari Shreedharan added a comment -

        Please take a look at:
        http://flume.apache.org/FlumeUserGuide.html#fan-out-flow

        you need to also specify the headers and the values on which to mutliplex

        Show
        Hari Shreedharan added a comment - Please take a look at: http://flume.apache.org/FlumeUserGuide.html#fan-out-flow you need to also specify the headers and the values on which to mutliplex
        Hide
        Osama Awad added a comment -

        so how to change it? how to make it multiplex instead of replicating ? I tried adding

        a1.sources.r1.selector.type = multiplexing

        but doesn't seem to work

        Show
        Osama Awad added a comment - so how to change it? how to make it multiplex instead of replicating ? I tried adding a1.sources.r1.selector.type = multiplexing but doesn't seem to work
        Hide
        Jayant Shekhar added a comment -

        Hi Osama, In flume the default fan-out flow is replicating. Hence the event is getting replicated to all the channels.

        Show
        Jayant Shekhar added a comment - Hi Osama, In flume the default fan-out flow is replicating. Hence the event is getting replicated to all the channels.
        Hide
        Osama Awad added a comment -

        ideas ?

        Show
        Osama Awad added a comment - ideas ?
        Hide
        Osama Awad added a comment - - edited

        Hello Jayant, here is the complete file

        a1.sources = r1
        a1.sinks = k1 k2 k3 k4
        a1.channels = c1 c2 c3 c4

        a1.sources.r1.type = org.apache.flume.source.http.HTTPSource
        a1.sources.r1.port = 5140
        a1.sources.r1.channels = c1 c2 c3 c4
        a1.sources.r1.handler = org.apache.flume.source.http.JSONHandler

        a1.sources.r1.interceptors = logging timestamp
        a1.sources.r1.interceptors.logging.type = com.xyz.flume.interceptors.LoggingInterceptor$Builder
        a1.sources.r1.interceptors.timestamp.type = org.apache.flume.interceptor.TimestampInterceptor$Builder

        a1.sinks.k1.type = hdfs
        a1.sinks.k1.hdfs.path = /tmp/hadoop-oawad/dfs/name2/%y-%m-%d/%H%M/%S
        a1.sinks.k1.hdfs.fileType = DataStream
        a1.sinks.k1.hdfs.writeFormat = Text
        a1.sinks.k1.hdfs.filePrefix = events
        a1.sinks.k1.hdfs.batchSize = 1000
        a1.sinks.k1.hdfs.round = true
        a1.sinks.k1.hdfs.roundValue = 10
        a1.sinks.k1.hdfs.roundUnit = minute
        a1.sinks.k1.hdfs.threadsPoolSize = 100
        a1.sinks.k1.hdfs.rollSize = 6144
        a1.sinks.k1.hdfs.rollCount = 20

        a1.sinks.k2.type = hdfs
        a1.sinks.k2.hdfs.path = /tmp/hadoop-oawad/dfs/name2/%y-%m-%d/%H%M/%S
        a1.sinks.k2.hdfs.fileType = DataStream
        a1.sinks.k2.hdfs.writeFormat = Text
        a1.sinks.k2.hdfs.filePrefix = events
        a1.sinks.k2.hdfs.batchSize = 1000
        a1.sinks.k2.hdfs.round = true
        a1.sinks.k2.hdfs.roundValue = 10
        a1.sinks.k2.hdfs.roundUnit = minute
        a1.sinks.k2.hdfs.threadsPoolSize = 100
        a1.sinks.k2.hdfs.rollSize = 6144
        a1.sinks.k2.hdfs.rollCount = 20

        a1.sinks.k3.type = hdfs
        a1.sinks.k3.hdfs.path = /tmp/hadoop-oawad/dfs/name2/%y-%m-%d/%H%M/%S
        a1.sinks.k3.hdfs.fileType = DataStream
        a1.sinks.k3.hdfs.writeFormat = Text
        a1.sinks.k3.hdfs.filePrefix = events
        a1.sinks.k3.hdfs.batchSize = 1000
        a1.sinks.k3.hdfs.round = true
        a1.sinks.k3.hdfs.roundValue = 10
        a1.sinks.k3.hdfs.roundUnit = minute
        a1.sinks.k3.hdfs.threadsPoolSize = 100
        a1.sinks.k3.hdfs.rollSize = 6144
        a1.sinks.k3.hdfs.rollCount = 20

        a1.sinks.k4.type = hdfs
        a1.sinks.k4.hdfs.path = /tmp/hadoop-oawad/dfs/name2/%y-%m-%d/%H%M/%S
        a1.sinks.k4.hdfs.fileType = DataStream
        a1.sinks.k4.hdfs.writeFormat = Text
        a1.sinks.k4.hdfs.filePrefix = events
        a1.sinks.k4.hdfs.batchSize = 1000
        a1.sinks.k4.hdfs.round = true
        a1.sinks.k4.hdfs.roundValue = 10
        a1.sinks.k4.hdfs.roundUnit = minute
        a1.sinks.k4.hdfs.threadsPoolSize = 100
        a1.sinks.k4.hdfs.rollSize = 6144
        a1.sinks.k4.hdfs.rollCount = 20

        a1.sinkgroups = g1
        a1.sinkgroups.g1.sinks = k1 k2 k3 k4
        a1.sinkgroups.g1.processor.type = load_balance
        a1.sinkgroups.g1.processor.selector = round_robin

        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 1000000
        a1.channels.c1.transactionCapacity = 1000

        a1.channels.c2.type = memory
        a1.channels.c2.capacity = 1000000
        a1.channels.c2.transactionCapacity = 1000

        a1.channels.c3.type = memory
        a1.channels.c3.capacity = 1000000
        a1.channels.c3.transactionCapacity = 1000

        a1.channels.c4.type = memory
        a1.channels.c4.capacity = 1000000
        a1.channels.c4.transactionCapacity = 1000

        a1.sources.r1.channels = c1 c2 c3 c4
        a1.sinks.k1.channel = c1
        a1.sinks.k2.channel = c2
        a1.sinks.k3.channel = c3
        a1.sinks.k4.channel = c4

        Show
        Osama Awad added a comment - - edited Hello Jayant, here is the complete file a1.sources = r1 a1.sinks = k1 k2 k3 k4 a1.channels = c1 c2 c3 c4 a1.sources.r1.type = org.apache.flume.source.http.HTTPSource a1.sources.r1.port = 5140 a1.sources.r1.channels = c1 c2 c3 c4 a1.sources.r1.handler = org.apache.flume.source.http.JSONHandler a1.sources.r1.interceptors = logging timestamp a1.sources.r1.interceptors.logging.type = com.xyz.flume.interceptors.LoggingInterceptor$Builder a1.sources.r1.interceptors.timestamp.type = org.apache.flume.interceptor.TimestampInterceptor$Builder a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /tmp/hadoop-oawad/dfs/name2/%y-%m-%d/%H%M/%S a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.writeFormat = Text a1.sinks.k1.hdfs.filePrefix = events a1.sinks.k1.hdfs.batchSize = 1000 a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = minute a1.sinks.k1.hdfs.threadsPoolSize = 100 a1.sinks.k1.hdfs.rollSize = 6144 a1.sinks.k1.hdfs.rollCount = 20 a1.sinks.k2.type = hdfs a1.sinks.k2.hdfs.path = /tmp/hadoop-oawad/dfs/name2/%y-%m-%d/%H%M/%S a1.sinks.k2.hdfs.fileType = DataStream a1.sinks.k2.hdfs.writeFormat = Text a1.sinks.k2.hdfs.filePrefix = events a1.sinks.k2.hdfs.batchSize = 1000 a1.sinks.k2.hdfs.round = true a1.sinks.k2.hdfs.roundValue = 10 a1.sinks.k2.hdfs.roundUnit = minute a1.sinks.k2.hdfs.threadsPoolSize = 100 a1.sinks.k2.hdfs.rollSize = 6144 a1.sinks.k2.hdfs.rollCount = 20 a1.sinks.k3.type = hdfs a1.sinks.k3.hdfs.path = /tmp/hadoop-oawad/dfs/name2/%y-%m-%d/%H%M/%S a1.sinks.k3.hdfs.fileType = DataStream a1.sinks.k3.hdfs.writeFormat = Text a1.sinks.k3.hdfs.filePrefix = events a1.sinks.k3.hdfs.batchSize = 1000 a1.sinks.k3.hdfs.round = true a1.sinks.k3.hdfs.roundValue = 10 a1.sinks.k3.hdfs.roundUnit = minute a1.sinks.k3.hdfs.threadsPoolSize = 100 a1.sinks.k3.hdfs.rollSize = 6144 a1.sinks.k3.hdfs.rollCount = 20 a1.sinks.k4.type = hdfs a1.sinks.k4.hdfs.path = /tmp/hadoop-oawad/dfs/name2/%y-%m-%d/%H%M/%S a1.sinks.k4.hdfs.fileType = DataStream a1.sinks.k4.hdfs.writeFormat = Text a1.sinks.k4.hdfs.filePrefix = events a1.sinks.k4.hdfs.batchSize = 1000 a1.sinks.k4.hdfs.round = true a1.sinks.k4.hdfs.roundValue = 10 a1.sinks.k4.hdfs.roundUnit = minute a1.sinks.k4.hdfs.threadsPoolSize = 100 a1.sinks.k4.hdfs.rollSize = 6144 a1.sinks.k4.hdfs.rollCount = 20 a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 k3 k4 a1.sinkgroups.g1.processor.type = load_balance a1.sinkgroups.g1.processor.selector = round_robin a1.channels.c1.type = memory a1.channels.c1.capacity = 1000000 a1.channels.c1.transactionCapacity = 1000 a1.channels.c2.type = memory a1.channels.c2.capacity = 1000000 a1.channels.c2.transactionCapacity = 1000 a1.channels.c3.type = memory a1.channels.c3.capacity = 1000000 a1.channels.c3.transactionCapacity = 1000 a1.channels.c4.type = memory a1.channels.c4.capacity = 1000000 a1.channels.c4.transactionCapacity = 1000 a1.sources.r1.channels = c1 c2 c3 c4 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2 a1.sinks.k3.channel = c3 a1.sinks.k4.channel = c4
        Hide
        Jayant Shekhar added a comment - - edited

        Hi Osama, Can you also add the source and channels. Also, am interested in looking at how are the sinks connected to the channels.

        Show
        Jayant Shekhar added a comment - - edited Hi Osama, Can you also add the source and channels. Also, am interested in looking at how are the sinks connected to the channels.

          People

          • Assignee:
            Unassigned
            Reporter:
            Osama Awad
          • Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

            • Created:
              Updated:

              Development