Uploaded image for project: 'Flume'
  1. Flume
  2. FLUME-3358

The kafka channel does not work properly when the interceptor filters into an empty event queue

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Trivial
    • Resolution: Not A Bug
    • Affects Version/s: 1.9.0
    • Fix Version/s: None
    • Component/s: Kafka Channel
    • Labels:
      None

      Description

      I have a requirement to stream and filter kafka topics according to the business. Because the filter will cause list<Event> to be empty, the whole pipeline will not work properly

      logic like this:

      String key = JsonPath.read(message, "$.key");
      switch (key)

      { case "test1": return process(key, event); case "test2": return process(key, event); case "test3": return process(key, event); default: return null; }

      When all data of a queue will be filtered,  this pipeline(kafka->hdfs) will stay in an abnormal state.

       

      This is my configuration:

      1. device flume
        Test.sources = r1
        Test.sinks = k1
        Test.channels = c1

      Test.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
      Test.sources.r1.kafka.bootstrap.servers = xxxx
      Test.sources.r1.topic = xxx
      Test.sources.r1.groupId = test_lab_1
      Test.sources.r1.kafka.consumer.timeout.ms = 100
      Test.sources.r1.interceptors = i1
      Test.sources.r1.interceptors.i1.type = com.goe.DeviceUsageDeserializerInterceptor$Builder. — This is my custom interceptor

      1. Describe the sink
        Test.sinks.k1.type = hdfs
        Test.sinks.k1.hdfs.path = /user/naming/%{DeviceDir}
        Test.sinks.k1.hdfs.filePrefix = device-
        Test.sinks.k1.hdfs.fileSuffix = .csv
        Test.sinks.k1.hdfs.inUseSuffix = .tmp
        Test.sinks.k1.hdfs.idleTimeout = 120
        Test.sinks.k1.hdfs.writeFormat = Text
        Test.sinks.k1.hdfs.batchSize = 100
        Test.sinks.k1.hdfs.threadsPoolSize = 10
        Test.sinks.k1.hdfs.rollSize = 0
        Test.sinks.k1.hdfs.rollCount = 0
      1. Use a channel which buffers events in memory
        Test.channels.c1.type = memory
        Test.channels.c1.capacity = 10000
        Test.channels.c1.transactionCapacity = 1000
      1. Bind the source and sink to the channel
        Test.sources.r1.channels = c1
        Test.sinks.k1.channel = c1

      this is exception:

        Attachments

        1. exception.jpg
          388 kB
          huaicui

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              kofiori huaicui
            • Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: