Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
1.7.0
-
None
-
None
-
Ubuntu VM:
uname -a
Linux ub64-master 3.19.0-25-generic #26~14.04.1-Ubuntu SMP Fri Jul 24 21:16:20 UTC 2015 x86_64 x86_64 x86_64 GNU/Linux
Description
Getting this error when trying to use org.apache.flume.sink.kite.DatasetSink
- apache-flume-1.7.0-SNAPSHOT-bin/logs/flume.log
08 Oct 2015 15:19:39,991 INFO [lifecycleSupervisor-1-0] (org.apache.flume.instrumentation.MonitoredCounterGroup.start:96) - Component type: SOURCE, name: spooldir-src started
08 Oct 2015 15:19:40,183 ERROR [pool-3-thread-1] (org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run:256) - FATAL: Spool Directory source spooldir-src: { spoolDir: /var/flume/spooldir/ }: Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume to continue processing.
{path=file:/home/bob/kite_demo/dns_data/.2d20b2e7-64f2-4d09-87da-aa12120518f7.avro.tmp, schema="bytes", fileSystem=org.apache.hadoop.fs.LocalFileSystem@15e5a95d, enableCompression=true, dataFileWriter=org.apache.avro.file.DataFileWriter@18d223e6, writer=org.apache.avro.reflect.ReflectDatumWriter@c32ddc5}
java.nio.charset.MalformedInputException: Input length = 1
at java.nio.charset.CoderResult.throwException(CoderResult.java:277)
at org.apache.flume.serialization.ResettableFileInputStream.readChar(ResettableFileInputStream.java:282)
at org.apache.flume.serialization.LineDeserializer.readLine(LineDeserializer.java:133)
at org.apache.flume.serialization.LineDeserializer.readEvent(LineDeserializer.java:71)
at org.apache.flume.serialization.LineDeserializer.readEvents(LineDeserializer.java:90)
at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:252)
at org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:228)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
08 Oct 2015 15:19:40,550 INFO [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.kitesdk.data.spi.filesystem.FileSystemWriter.initialize:147) - Opened output appender AvroAppenderfor file:/home/bob/kite_demo/dns_data/2d20b2e7-64f2-4d09-87da-aa12120518f7.avro
- Here is how I get into this problem...
- This is flume configuration file
- conf/flume.kite.conf
a5.sources = spooldir-src
a5.sinks = sink1
a5.channels = mem-ch1
a5.channels.mem-ch1.type = memory
a5.channels.mem-ch1.capacity = 10000
a5.channels.mem-ch1.transactionCapacity = 1000
a5.sources.spooldir-src.type = spooldir
a5.sources.spooldir-src.spoolDir = /var/flume/spooldir/
a5.sources.spooldir-src.deletePolicy = immediate
a5.sources.spooldir-src.channels = mem-ch1
a5.sources.spooldir-src.selector.type = replicating
a5.sources.spooldir-src.interceptors = i1
a5.sources.spooldir-src.interceptors.i1.type = org.apache.flume.interceptor.ibInterceptor$Builder
a5.sources.spooldir-src.interceptors.i1.preserveExisting = false
a5.sources.spooldir-src.interceptors.i1.header = flume.avro.schema.literal
a5.sources.spooldir-src.interceptors.i1.schema = /var/schema/dns_data.avsc
a5.sinks.sink1.type = org.apache.flume.sink.kite.DatasetSink
a5.sinks.sink1.channel = mem-ch1
a5.sinks.sink1.kite.dataset.uri = dataset:file:/home/bob/kite_demo/dns_data
a5.sinks.sink1.kite.entityParser = avro
cat /var/schema/dns_data.avsc
"bytes"
- Start flume
apache-flume-1.7.0-SNAPSHOT-bin$ ./bin/flume-ng agent -c conf -f conf/flume.kite.conf -n a5
- Receive a text file with one line only
- captured-dns.txt
19-Sep-2015 01:14:23.190 client 172.31.1.130#55282: UDP: query: a1.z1.com IN A response: NOERROR +AEV a1.z1.com. 28800 IN A 1.2.3.4;
- copy it to /home/bob/tmp/captured-dns_filtered
- serialize it using generic schema
java -jar ~/avro/avro-src-1.7.7/lang/java/tools/target/avro-tools-1.7.7.jar fromtext /home/bob/tmp/captured-dns_filtered /home/bob/tmp/captured-dns_filtered.avro
- produced avro file looks ok; here is schema pulled out of it
java -jar ~/avro/avro-src-1.7.7/lang/java/tools/target/avro-tools-1.7.7.jar getschema /var/flume/spooldir/captured-dns_filtered.avro
"bytes"
- and the data
java -jar ~/avro/avro-src-1.7.7/lang/java/tools/target/avro-tools-1.7.7.jar totext /var/flume/spooldir/captured-dns_filtered.avro -
19-Sep-2015 01:14:23.190 client 172.31.1.130#55282: UDP: query: a1.z1.com IN A response: NOERROR +AEV a1.z1.com. 28800 IN A 1.2.3.4;
- so, move avro file to flume spool folder
mv /home/bob/tmp/captured-dns_filtered.avro /var/flume/spooldir/ - and the above log happens.
- I added interceptor which seems to be configured correctly and I can see debug message
- when I start flume
/**
- Only
{@link ibInterceptor.Builder}
can build me
*/
private ibInterceptor(boolean preserveExisting, boolean useIP,
String header, String schema)
{
this.preserveExisting = preserveExisting;
this.header = header;
this.schema = schema;
InetAddress addr;
System.out.println("\n-ibInterceptor header=" + header);
System.out.println("-ibInterceptor schema=" + schema);
tryUnknown macro: { addr = InetAddress.getLocalHost(); if (useIP) { host = addr.getHostAddress(); System.out.println("-ibInterceptor host=" + host); } else { host = addr.getCanonicalHostName(); } }catch (UnknownHostException e)
{ logger.warn("Could not get local host address. Exception follows.", e); }}
- the intercept() method is not called; the println message at the very beginning never
- gets displayed
- though it should based on java docs. Using this same custom interceptor with other kind
- of sinks, like file_roll works fine.
/**
- Modifies events in-place.
*/
@Override
public Event intercept(Event event)
{
System.out.println("--intercept()");
if (schema != null)
{ String schema_def = "bytes"; headers.put(header, schema_def); System.out.println("-intercept(): schema_def=" + schema_def); } return event;
}
- The only purpose of my interceptor is to insert schema into the flume event header;
- without it, flume complains
08 Oct 2015 17:38:41,738 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.kite.policy.RetryPolicy.handle:39) - Event delivery failed: No schema in event headers. Headers must include either flume.avro.schema.url or flume.avro.schema.literal
08 Oct 2015 17:38:41,738 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.SinkRunner$PollingRunner.run:160) - Unable to deliver event. Exception follows.
- makes sense based on Kite Dataset Sink docs.
Am I doing something wrong?