Uploaded image for project: 'Flume'
  1. Flume
  2. FLUME-2809

sink.kite.DatasetSink MalformedInputException

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.7.0
    • None
    • Sinks+Sources
    • None
    • Ubuntu VM:
      uname -a
      Linux ub64-master 3.19.0-25-generic #26~14.04.1-Ubuntu SMP Fri Jul 24 21:16:20 UTC 2015 x86_64 x86_64 x86_64 GNU/Linux

    Description

      Getting this error when trying to use org.apache.flume.sink.kite.DatasetSink

      1. apache-flume-1.7.0-SNAPSHOT-bin/logs/flume.log
        08 Oct 2015 15:19:39,991 INFO [lifecycleSupervisor-1-0] (org.apache.flume.instrumentation.MonitoredCounterGroup.start:96) - Component type: SOURCE, name: spooldir-src started
        08 Oct 2015 15:19:40,183 ERROR [pool-3-thread-1] (org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run:256) - FATAL: Spool Directory source spooldir-src: { spoolDir: /var/flume/spooldir/ }

        : Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume to continue processing.
        java.nio.charset.MalformedInputException: Input length = 1
        at java.nio.charset.CoderResult.throwException(CoderResult.java:277)
        at org.apache.flume.serialization.ResettableFileInputStream.readChar(ResettableFileInputStream.java:282)
        at org.apache.flume.serialization.LineDeserializer.readLine(LineDeserializer.java:133)
        at org.apache.flume.serialization.LineDeserializer.readEvent(LineDeserializer.java:71)
        at org.apache.flume.serialization.LineDeserializer.readEvents(LineDeserializer.java:90)
        at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:252)
        at org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:228)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
        08 Oct 2015 15:19:40,550 INFO [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.kitesdk.data.spi.filesystem.FileSystemWriter.initialize:147) - Opened output appender AvroAppender

        {path=file:/home/bob/kite_demo/dns_data/.2d20b2e7-64f2-4d09-87da-aa12120518f7.avro.tmp, schema="bytes", fileSystem=org.apache.hadoop.fs.LocalFileSystem@15e5a95d, enableCompression=true, dataFileWriter=org.apache.avro.file.DataFileWriter@18d223e6, writer=org.apache.avro.reflect.ReflectDatumWriter@c32ddc5}

        for file:/home/bob/kite_demo/dns_data/2d20b2e7-64f2-4d09-87da-aa12120518f7.avro

      1. Here is how I get into this problem...
      1. This is flume configuration file
      2. conf/flume.kite.conf
        a5.sources = spooldir-src
        a5.sinks = sink1
        a5.channels = mem-ch1
        a5.channels.mem-ch1.type = memory
        a5.channels.mem-ch1.capacity = 10000
        a5.channels.mem-ch1.transactionCapacity = 1000
        a5.sources.spooldir-src.type = spooldir
        a5.sources.spooldir-src.spoolDir = /var/flume/spooldir/
        a5.sources.spooldir-src.deletePolicy = immediate
        a5.sources.spooldir-src.channels = mem-ch1
        a5.sources.spooldir-src.selector.type = replicating
        a5.sources.spooldir-src.interceptors = i1
        a5.sources.spooldir-src.interceptors.i1.type = org.apache.flume.interceptor.ibInterceptor$Builder
        a5.sources.spooldir-src.interceptors.i1.preserveExisting = false
        a5.sources.spooldir-src.interceptors.i1.header = flume.avro.schema.literal
        a5.sources.spooldir-src.interceptors.i1.schema = /var/schema/dns_data.avsc
        a5.sinks.sink1.type = org.apache.flume.sink.kite.DatasetSink
        a5.sinks.sink1.channel = mem-ch1
        a5.sinks.sink1.kite.dataset.uri = dataset:file:/home/bob/kite_demo/dns_data
        a5.sinks.sink1.kite.entityParser = avro

      cat /var/schema/dns_data.avsc
      "bytes"

      1. Start flume
        apache-flume-1.7.0-SNAPSHOT-bin$ ./bin/flume-ng agent -c conf -f conf/flume.kite.conf -n a5
      1. Receive a text file with one line only
      2. captured-dns.txt
        19-Sep-2015 01:14:23.190 client 172.31.1.130#55282: UDP: query: a1.z1.com IN A response: NOERROR +AEV a1.z1.com. 28800 IN A 1.2.3.4;
      1. copy it to /home/bob/tmp/captured-dns_filtered
      2. serialize it using generic schema
        java -jar ~/avro/avro-src-1.7.7/lang/java/tools/target/avro-tools-1.7.7.jar fromtext /home/bob/tmp/captured-dns_filtered /home/bob/tmp/captured-dns_filtered.avro
      1. produced avro file looks ok; here is schema pulled out of it
        java -jar ~/avro/avro-src-1.7.7/lang/java/tools/target/avro-tools-1.7.7.jar getschema /var/flume/spooldir/captured-dns_filtered.avro
        "bytes"
      1. and the data
        java -jar ~/avro/avro-src-1.7.7/lang/java/tools/target/avro-tools-1.7.7.jar totext /var/flume/spooldir/captured-dns_filtered.avro -
        19-Sep-2015 01:14:23.190 client 172.31.1.130#55282: UDP: query: a1.z1.com IN A response: NOERROR +AEV a1.z1.com. 28800 IN A 1.2.3.4;
      1. so, move avro file to flume spool folder
        mv /home/bob/tmp/captured-dns_filtered.avro /var/flume/spooldir/
      2. and the above log happens.
      1. I added interceptor which seems to be configured correctly and I can see debug message
      2. when I start flume
        /**
      • Only {@link ibInterceptor.Builder}

        can build me
        */
        private ibInterceptor(boolean preserveExisting, boolean useIP,
        String header, String schema)
        {
        this.preserveExisting = preserveExisting;
        this.header = header;
        this.schema = schema;
        InetAddress addr;
        System.out.println("\n-ibInterceptor header=" + header);
        System.out.println("-ibInterceptor schema=" + schema);
        try

        Unknown macro: { addr = InetAddress.getLocalHost(); if (useIP) { host = addr.getHostAddress(); System.out.println("-ibInterceptor host=" + host); } else { host = addr.getCanonicalHostName(); } }

        catch (UnknownHostException e)

        { logger.warn("Could not get local host address. Exception follows.", e); }

        }

      1. the intercept() method is not called; the println message at the very beginning never
      2. gets displayed
      3. though it should based on java docs. Using this same custom interceptor with other kind
      4. of sinks, like file_roll works fine.
        /**
      • Modifies events in-place.
        */
        @Override
        public Event intercept(Event event)
        {
        System.out.println("--intercept()");

      if (schema != null)

      { String schema_def = "bytes"; headers.put(header, schema_def); System.out.println("-intercept(): schema_def=" + schema_def); }

      return event;
      }

      1. The only purpose of my interceptor is to insert schema into the flume event header;
      2. without it, flume complains
        08 Oct 2015 17:38:41,738 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.kite.policy.RetryPolicy.handle:39) - Event delivery failed: No schema in event headers. Headers must include either flume.avro.schema.url or flume.avro.schema.literal
        08 Oct 2015 17:38:41,738 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.SinkRunner$PollingRunner.run:160) - Unable to deliver event. Exception follows.
      1. makes sense based on Kite Dataset Sink docs.

      Am I doing something wrong?

      Attachments

        Activity

          People

            Unassigned Unassigned
            jovan Jovan Kilibarda
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: