Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Not A Bug
-
1.7.0
-
None
-
None
-
windows 10
Description
When I use kafka sink to push logs from windows path to kafka server, I meet the format error as follows
I think it is because windows path was parsed in linux style。
17 三月 2017 11:54:29,476 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.SinkRunner$PollingRunner.run:158) - Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to publish events
at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:252)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flume.EventDeliveryException: Non integer partition id specified
at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:214)
... 3 more
Caused by: java.lang.NumberFormatException: For input string: "C:\Users\smart\Desktop\source\23.txt"
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Integer.parseInt(Integer.java:580)
at java.lang.Integer.parseInt(Integer.java:615)
at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:202)
... 3 more
My config text is
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
tier1.sources = s1
tier1.channels = c1
tier1.sinks = sk1
tier1.sources.s1.type = spooldir
tier1.sources.s1.spoolDir = C:\\Users\\smart\\Desktop
source
tier1.sources.s1.fileHeader = true
tier1.sources.s1.fileHeaderKey = key
tier1.sources.s1.deserializer = org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder
tier1.sources.s1.deserializer.maxBlobLength = 2000000000
tier1.sources.s1.deserializer.maxBackoff=30000
tier1.sources.s1.channels = c1
tier1.channels.c1.type = memory
tier1.channels.c1.capacity = 10004
tier1.channels.c1.transactionCapacity = 100
tier1.sinks.sk1.type = org.apache.flume.sink.kafka.KafkaSink
tier1.sinks.sk1.topic = winSink
tier1.sinks.sk1.brokerList = host1:9092,host2:9092,host3:9092,host4:9092
tier1.sinks.sk1.partitionIdHeader = key
tier1.sinks.sk1.channel = c1
tier1.sinks.sk1.batchSize = 20