Uploaded image for project: 'Flume'
  1. Flume
  2. FLUME-3074

format error happened on windows when kafka sink is used

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Not A Bug
    • 1.7.0
    • None
    • Sinks+Sources
    • None
    • windows 10

    Description

      When I use kafka sink to push logs from windows path to kafka server, I meet the format error as follows

      I think it is because windows path was parsed in linux style。

      17 三月 2017 11:54:29,476 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.SinkRunner$PollingRunner.run:158) - Unable to deliver event. Exception follows.
      org.apache.flume.EventDeliveryException: Failed to publish events
      at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:252)
      at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
      at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
      at java.lang.Thread.run(Thread.java:745)
      Caused by: org.apache.flume.EventDeliveryException: Non integer partition id specified
      at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:214)
      ... 3 more
      Caused by: java.lang.NumberFormatException: For input string: "C:\Users\smart\Desktop\source\23.txt"
      at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
      at java.lang.Integer.parseInt(Integer.java:580)
      at java.lang.Integer.parseInt(Integer.java:615)
      at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:202)
      ... 3 more

      My config text is
      ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
      tier1.sources = s1
      tier1.channels = c1
      tier1.sinks = sk1

      tier1.sources.s1.type = spooldir
      tier1.sources.s1.spoolDir = C:\\Users\\smart\\Desktop
      source
      tier1.sources.s1.fileHeader = true
      tier1.sources.s1.fileHeaderKey = key
      tier1.sources.s1.deserializer = org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder
      tier1.sources.s1.deserializer.maxBlobLength = 2000000000
      tier1.sources.s1.deserializer.maxBackoff=30000

      tier1.sources.s1.channels = c1

      tier1.channels.c1.type = memory
      tier1.channels.c1.capacity = 10004
      tier1.channels.c1.transactionCapacity = 100

      tier1.sinks.sk1.type = org.apache.flume.sink.kafka.KafkaSink
      tier1.sinks.sk1.topic = winSink
      tier1.sinks.sk1.brokerList = host1:9092,host2:9092,host3:9092,host4:9092
      tier1.sinks.sk1.partitionIdHeader = key
      tier1.sinks.sk1.channel = c1
      tier1.sinks.sk1.batchSize = 20

      Attachments

        Activity

          People

            Unassigned Unassigned
            zhiqiangzhao zhiqiangzhao
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: