Uploaded image for project: 'Flume'
  1. Flume
  2. FLUME-3071

Multiplex Channel Selector is not working properly at flume 1.7 version

    XMLWordPrintableJSON

    Details

    • Type: Request
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 1.7.0
    • Fix Version/s: 1.7.0
    • Component/s: Configuration
    • Labels:
      None
    • Environment:

      Environment Details
      Hadoop 2.5.2
      Flume 1.7
      Hive 0.14

      Description

      Environment Details
      I am Using multiplex channel selector to load the data to two different Hive sinks.But it get strucked ai source si started after that didn't get any error.Data also not get loaded here.

      #Source
      test_interceptors.sources = RTI
      test_interceptors.channels = RTI_Channel1 RTI_Channel2
      test_interceptors.sinks = RTI_to_hive1 RTI_to_hive1

      test_interceptors.sources.RTI.channels = RTI_Channel1 RTI_Channel2

      #test_interceptors.sinks.RTI_to_hive3.channel = RTI_Channel3

      test_interceptors.sources.RTI.type = TAILDIR
      test_interceptors.sources.RTI.positionFile=/home/retailteg/flume/taildir_position.json
      test_interceptors.sources.RTI.filegroups=f1
      test_interceptors.sources.RTI.filegroups.f1=/home/retailteg/flume/flumeSpool/initial_insert
      test_interceptors.sources.RTI.filegroups.f1.headerkey1=value1
      test_interceptors.sources.RTI.fileHeader = false
      test_interceptors.sources.RTI.skipToEnd = false

      test_interceptors.sources.RTI.selector.type = multiplexing
      test_interceptors.sources.RTI.selector.header=table_name
      test_interceptors.sources.RTI.selector.mapping.Table1=RTI_Channel1
      test_interceptors.sources.RTI.selector.mapping.Table2= RTI_Channel2
      test_interceptors.sources.RTI.selector.default= RTI_Channel2

      #Channel
      test_interceptors.channels.RTI_Channel1.type = memory
      test_interceptors.channels.RTI_Channel1.capacity = 100000
      test_interceptors.channels.RTI_Channel1.transactionCapacity = 100000
      test_interceptors.channels.RTI_Channel2.type = memory
      test_interceptors.channels.RTI_Channel2.capacity = 100000
      test_interceptors.channels.RTI_Channel2.transactionCapacity = 100000

      #Sinks
      test_interceptors.sinks.RTI_to_hive1.type = hive
      test_interceptors.sinks.RTI_to_hive1.hive.metastore = thrift://172.20.180.64:9083
      test_interceptors.sinks.RTI_to_hive1.hive.database = default
      test_interceptors.sinks.RTI_to_hive1.hive.table = RTI_Test_Feb2017_8
      test_interceptors.sinks.RTI_to_hive1.hive.partition = %Y-%m-%d
      test_interceptors.sinks.RTI_to_hive1.useLocalTimeStamp = true
      test_interceptors.sinks.RTI_to_hive1.round = true
      test_interceptors.sinks.RTI_to_hive1.roundValue = 10
      test_interceptors.sinks.RTI_to_hive1.roundUnit = minute
      test_interceptors.sinks.RTI_to_hive1.serializer = DELIMITED
      test_interceptors.sinks.RTI_to_hive1.serializer.delimiter = ","
      test_interceptors.sinks.RTI_to_hive1.serializer.serdeSeparator = ','
      test_interceptors.sinks.RTI_to_hive1.serializer.fieldnames = table_name,rxclaim,seqno,carrier_id,account_id,group_id,member_id,drugcost,pharmacyname,membername,datofbirth,plan_id,landing_timestamp,journal_timestamp,transaction_flag

      test_interceptors.sinks.RTI_to_hive2.type = hive
      test_interceptors.sinks.RTI_to_hive2.hive.metastore = thrift://172.20.180.64:9083
      test_interceptors.sinks.RTI_to_hive2.hive.database = default
      test_interceptors.sinks.RTI_to_hive2.hive.table = RTI_Test_Feb2017_9
      test_interceptors.sinks.RTI_to_hive2.hive.partition = %Y-%m-%d
      test_interceptors.sinks.RTI_to_hive2.useLocalTimeStamp = true
      test_interceptors.sinks.RTI_to_hive2.round = true
      test_interceptors.sinks.RTI_to_hive2.roundValue = 10
      test_interceptors.sinks.RTI_to_hive2.roundUnit = minute
      test_interceptors.sinks.RTI_to_hive2.serializer = DELIMITED
      test_interceptors.sinks.RTI_to_hive2.serializer.delimiter = ","
      test_interceptors.sinks.RTI_to_hive2.serializer.serdeSeparator = ','
      test_interceptors.sinks.RTI_to_hive2.serializer.fieldnames = table_name,rxclaim,seqno,carrier_id,account_id,group_id,member_id,drugcost,pharmacyname,membername,datofbirth,plan_id,landing_timestamp,journal_timestamp,transaction_flag

      test_interceptors.sinks.RTI_to_hive1.channel = RTI_Channel1
      test_interceptors.sinks.RTI_to_hive2.channel = RTI_Channel2

      In Log Files Contenet

      2017-03-10 19:44:18,770 (main) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:144)] Starting Channel RTI_Channel1
      2017-03-10 19:44:18,773 (main) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:144)] Starting Channel RTI_Channel2
      2017-03-10 19:44:18,775 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: RTI_Channel1: Successfully registered new MBean.
      2017-03-10 19:44:18,775 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: RTI_Channel1 started
      2017-03-10 19:44:18,776 (main) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:159)] Waiting for channel: RTI_Channel2 to start. Sleeping for 500 ms
      2017-03-10 19:44:18,795 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: RTI_Channel2: Successfully registered new MBean.
      2017-03-10 19:44:18,795 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: RTI_Channel2 started
      2017-03-10 19:44:19,276 (main) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:171)] Starting Sink RTI_to_hive1
      2017-03-10 19:44:19,277 (main) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:182)] Starting Source RTI
      2017-03-10 19:44:19,277 (lifecycleSupervisor-1-5) [INFO - org.apache.flume.source.taildir.TaildirSource.start(TaildirSource.java:92)] RTI TaildirSource source starting with directory:

      {f1=/home/retailteg/flume/flumeSpool/initial_insert}
      2017-03-10 19:44:19,281 (lifecycleSupervisor-1-5) [DEBUG - org.apache.flume.source.taildir.ReliableTaildirEventReader.<init>(ReliableTaildirEventReader.java:75)] Initializing ReliableTaildirEventReader with directory={f1=/home/retailteg/flume/flumeSpool/initial_insert}

      , metaDir={}
      2017-03-10 19:44:19,283 (lifecycleSupervisor-1-5) [INFO - org.apache.flume.source.taildir.ReliableTaildirEventReader.<init>(ReliableTaildirEventReader.java:83)] taildirCache: [

      {filegroup='f1', filePattern='/home/retailteg/flume/flumeSpool/initial_insert', cached=true}

      ]
      2017-03-10 19:44:19,285 (lifecycleSupervisor-1-5) [INFO - org.apache.flume.source.taildir.ReliableTaildirEventReader.<init>(ReliableTaildirEventReader.java:84)] headerTable: {}
      2017-03-10 19:44:19,289 (lifecycleSupervisor-1-5) [INFO - org.apache.flume.source.taildir.ReliableTaildirEventReader.openFile(ReliableTaildirEventReader.java:283)] Opening file: /home/retailteg/flume/flumeSpool/initial_insert, inode: 9970903, pos: 0
      2017-03-10 19:44:19,290 (lifecycleSupervisor-1-5) [INFO - org.apache.flume.source.taildir.ReliableTaildirEventReader.<init>(ReliableTaildirEventReader.java:94)] Updating position from position file: /home/retailteg/flume/taildir_position.json
      2017-03-10 19:44:19,309 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:141)] Polling sink runner starting
      2017-03-10 19:44:19,362 (lifecycleSupervisor-1-5) [INFO - org.apache.flume.source.taildir.TailFile.updatePos(TailFile.java:126)] Updated position, file: /home/retailteg/flume/flumeSpool/initial_insert, inode: 9970903, pos: 884
      2017-03-10 19:44:19,364 (lifecycleSupervisor-1-5) [DEBUG - org.apache.flume.source.taildir.TaildirSource.start(TaildirSource.java:118)] TaildirSource started
      2017-03-10 19:44:19,364 (lifecycleSupervisor-1-5) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SOURCE, name: RTI: Successfully registered new MBean.
      2017-03-10 19:44:19,364 (lifecycleSupervisor-1-5) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SOURCE, name: RTI started
      2017-03-10 19:44:19,366 (PollableSourceRunner-TaildirSource-RTI) [DEBUG - org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:127)] Polling runner starting. Source:Taildir source:

      { positionFile: /home/retailteg/flume/taildir_position.json, skipToEnd: false, byteOffsetHeader: false, idleTimeout: 120000, writePosInterval: 3000 }

      2017-03-10 19:46:19,459 (PollableSourceRunner-TaildirSource-RTI) [INFO - org.apache.flume.source.taildir.TaildirSource.closeTailFiles(TaildirSource.java:288)] Closed file: /home/retailteg/flume/flumeSpool/initial_insert, inode: 9970903, pos: 884

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              sanathkumar.prasanna@tcs.com sanathkumar prasanna
            • Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

              • Created:
                Updated: