Uploaded image for project: 'Flume'
  1. Flume
  2. FLUME-1676

ExecSource should provide a configurable charset

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: notrack
    • Fix Version/s: 1.4.0
    • Component/s: None
    • Labels:
    • Environment:
    • Release Note:
      Readers and Strings used for data exchange with native-process and channel are charset aware now.

      Description

      The character set is currently not configurable in the exec source - http://flume.apache.org/FlumeUserGuide.html#exec-source

      File - https://github.com/apache/flume/blob/trunk/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java

      Can somebody please expose the ability to specify character set in the exec source?

        Issue Links

          Activity

          Hide
          hudson Hudson added a comment -

          Integrated in flume-trunk #340 (See https://builds.apache.org/job/flume-trunk/340/)
          FLUME-1676: ExecSource should provide a configurable charset (Revision d20c94ca61103632de2cd941a716dbd4d9c6d719)

          Result = SUCCESS
          brock : http://git-wip-us.apache.org/repos/asf/flume/repo?p=flume.git&a=commit&h=d20c94ca61103632de2cd941a716dbd4d9c6d719
          Files :

          • flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java
          • flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
          Show
          hudson Hudson added a comment - Integrated in flume-trunk #340 (See https://builds.apache.org/job/flume-trunk/340/ ) FLUME-1676 : ExecSource should provide a configurable charset (Revision d20c94ca61103632de2cd941a716dbd4d9c6d719) Result = SUCCESS brock : http://git-wip-us.apache.org/repos/asf/flume/repo?p=flume.git&a=commit&h=d20c94ca61103632de2cd941a716dbd4d9c6d719 Files : flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
          Hide
          brocknoland Brock Noland added a comment -

          Committed to trunk and 1.4! Thanks Nitin for your contribution!

          Show
          brocknoland Brock Noland added a comment - Committed to trunk and 1.4! Thanks Nitin for your contribution!
          Hide
          nitin_matrix Nitin Verma added a comment -

          https://reviews.apache.org/r/8383/

          cleaned up, but not able to 'Update Diff' the patch.

          Show
          nitin_matrix Nitin Verma added a comment - https://reviews.apache.org/r/8383/ cleaned up, but not able to 'Update Diff' the patch.
          Hide
          brocknoland Brock Noland added a comment -

          Thanks Nitin! I commented on the review board item. Thank you very much for the patch!!

          Show
          brocknoland Brock Noland added a comment - Thanks Nitin! I commented on the review board item. Thank you very much for the patch!!
          Hide
          nitin_matrix Nitin Verma added a comment -

          Sure Done. Brock & Roshan

          https://reviews.apache.org/r/8383/

          Show
          nitin_matrix Nitin Verma added a comment - Sure Done. Brock & Roshan https://reviews.apache.org/r/8383/
          Hide
          brocknoland Brock Noland added a comment -

          Nitin,

          Thank you for the patch!! Since it's a non-trival patch, would you mind posting a review board item? reviews.apache.org

          Brock

          Show
          brocknoland Brock Noland added a comment - Nitin, Thank you for the patch!! Since it's a non-trival patch, would you mind posting a review board item? reviews.apache.org Brock
          Hide
          roshan_naik Roshan Naik added a comment -

          would be nice to have it on review board.

          Show
          roshan_naik Roshan Naik added a comment - would be nice to have it on review board.
          Hide
          nitin_matrix Nitin Verma added a comment -

          Fix was uploaded on 3rd Nov, I am waiting for comments.

          Show
          nitin_matrix Nitin Verma added a comment - Fix was uploaded on 3rd Nov, I am waiting for comments.
          Hide
          nitin_matrix Nitin Verma added a comment -

          attaching the patch please review.

          Show
          nitin_matrix Nitin Verma added a comment - attaching the patch please review.
          Hide
          nitin_matrix Nitin Verma added a comment -

          From 22a33fb248fc60bd46a594c1980841ca6a19c8a1 Mon Sep 17 00:00:00 2001
          From: Nitin Verma <nitin.matrix@gmail.com>
          Date: Sun, 4 Nov 2012 02:07:49 +0530
          Subject: [PATCH 2/2] Added charset support for ExecSource


          .../java/org/apache/flume/source/ExecSource.java | 20 +++++++++++++++-----
          .../source/ExecSourceConfigurationConstants.java | 8 ++++++++
          2 files changed, 23 insertions, 5 deletions

          diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
          index 46f672f..9d3b70a 100644
          — a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
          +++ b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
          @@ -42,6 +42,7 @@ import org.slf4j.Logger;
          import org.slf4j.LoggerFactory;

          import com.google.common.base.Preconditions;
          +import java.nio.charset.Charset;

          /**

          • <p>
            @@ -149,6 +150,7 @@ Configurable {
            private boolean logStderr;
            private Integer bufferCount;
            private ExecRunnable runner;
            + private Charset charset;

          @Override
          public void start() {
          @@ -158,7 +160,7 @@ Configurable {
          counterGroup = new CounterGroup();

          runner = new ExecRunnable(command, getChannelProcessor(), counterGroup,

          • restart, restartThrottle, logStderr, bufferCount);
            + restart, restartThrottle, logStderr, bufferCount, charset);

          // FIXME: Use a callback-like executor / future to signal us upon failure.
          runnerFuture = executor.submit(runner);
          @@ -224,13 +226,16 @@ Configurable

          { bufferCount = context.getInteger(ExecSourceConfigurationConstants.CONFIG_BATCH_SIZE, ExecSourceConfigurationConstants.DEFAULT_BATCH_SIZE); + + charset = Charset.forName(context.getString(ExecSourceConfigurationConstants.CHARSET, + ExecSourceConfigurationConstants.DEFAULT_CHARSET)); }

          private static class ExecRunnable implements Runnable {

          public ExecRunnable(String command, ChannelProcessor channelProcessor,
          CounterGroup counterGroup, boolean restart, long restartThrottle,

          • boolean logStderr, int bufferCount) {
            + boolean logStderr, int bufferCount, Charset charset) {
            this.command = command;
            this.channelProcessor = channelProcessor;
            this.counterGroup = counterGroup;
            @@ -238,6 +243,7 @@ Configurable { this.bufferCount = bufferCount; this.restart = restart; this.logStderr = logStderr; + this.charset = charset; }

          private String command;
          @@ -247,6 +253,7 @@ Configurable {
          private long restartThrottle;
          private int bufferCount;
          private boolean logStderr;
          + private Charset charset;
          private Process process = null;

          @Override
          @@ -258,11 +265,11 @@ Configurable {
          String[] commandArgs = command.split("
          s+");
          process = new ProcessBuilder(commandArgs).start();
          reader = new BufferedReader(

          • new InputStreamReader(process.getInputStream()));
            + new InputStreamReader(process.getInputStream(), charset));

          // StderrLogger dies as soon as the input stream is invalid
          StderrReader stderrReader = new StderrReader(new BufferedReader(

          • new InputStreamReader(process.getErrorStream())), logStderr);
            + new InputStreamReader(process.getErrorStream(), charset)), logStderr);
            stderrReader.setName("StderrReader-[" + command + "]");
            stderrReader.setDaemon(true);
            stderrReader.start();
            @@ -271,7 +278,7 @@ Configurable {
            List<Event> eventList = new ArrayList<Event>();
            while ((line = reader.readLine()) != null) {
            counterGroup.incrementAndGet("exec.lines.read");
          • eventList.add(EventBuilder.withBody(line.getBytes()));
            + eventList.add(EventBuilder.withBody(line.getBytes(charset)));
            if(eventList.size() >= bufferCount) {
            channelProcessor.processEventBatch(eventList);
            eventList.clear();
            @@ -340,6 +347,9 @@ Configurable {
            String line = null;
            while((line = input.readLine()) != null) {
            if(logStderr) {
            + // There is no need to read 'line' with a charset
            + // as we do not to propagate it.
            + // It is in UTF-16 and would be printed in UTF-8 format.
            logger.info("StderrLogger[{}] = '{}'", ++i, line);
            }
            }
            diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java
            index 0ba0508..2176d20 100644
              • a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java
                +++ b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java
                @@ -18,6 +18,8 @@
                */
                package org.apache.flume.source;

          +import java.nio.charset.Charset;
          +
          public class ExecSourceConfigurationConstants {

          /**
          @@ -43,4 +45,10 @@ public class ExecSourceConfigurationConstants

          { */ public static final String CONFIG_BATCH_SIZE = "batchSize"; public static final int DEFAULT_BATCH_SIZE = 20; + + /** + * Charset for reading input + */ + public static final String CHARSET = "charset"; + public static final String DEFAULT_CHARSET = "UTF-8"; }
          Show
          nitin_matrix Nitin Verma added a comment - From 22a33fb248fc60bd46a594c1980841ca6a19c8a1 Mon Sep 17 00:00:00 2001 From: Nitin Verma <nitin.matrix@gmail.com> Date: Sun, 4 Nov 2012 02:07:49 +0530 Subject: [PATCH 2/2] Added charset support for ExecSource — .../java/org/apache/flume/source/ExecSource.java | 20 +++++++++++++++----- .../source/ExecSourceConfigurationConstants.java | 8 ++++++++ 2 files changed, 23 insertions , 5 deletions diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java index 46f672f..9d3b70a 100644 — a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java @@ -42,6 +42,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; +import java.nio.charset.Charset; /** <p> @@ -149,6 +150,7 @@ Configurable { private boolean logStderr; private Integer bufferCount; private ExecRunnable runner; + private Charset charset; @Override public void start() { @@ -158,7 +160,7 @@ Configurable { counterGroup = new CounterGroup(); runner = new ExecRunnable(command, getChannelProcessor(), counterGroup, restart, restartThrottle, logStderr, bufferCount); + restart, restartThrottle, logStderr, bufferCount, charset); // FIXME: Use a callback-like executor / future to signal us upon failure. runnerFuture = executor.submit(runner); @@ -224,13 +226,16 @@ Configurable { bufferCount = context.getInteger(ExecSourceConfigurationConstants.CONFIG_BATCH_SIZE, ExecSourceConfigurationConstants.DEFAULT_BATCH_SIZE); + + charset = Charset.forName(context.getString(ExecSourceConfigurationConstants.CHARSET, + ExecSourceConfigurationConstants.DEFAULT_CHARSET)); } private static class ExecRunnable implements Runnable { public ExecRunnable(String command, ChannelProcessor channelProcessor, CounterGroup counterGroup, boolean restart, long restartThrottle, boolean logStderr, int bufferCount) { + boolean logStderr, int bufferCount, Charset charset) { this.command = command; this.channelProcessor = channelProcessor; this.counterGroup = counterGroup; @@ -238,6 +243,7 @@ Configurable { this.bufferCount = bufferCount; this.restart = restart; this.logStderr = logStderr; + this.charset = charset; } private String command; @@ -247,6 +253,7 @@ Configurable { private long restartThrottle; private int bufferCount; private boolean logStderr; + private Charset charset; private Process process = null; @Override @@ -258,11 +265,11 @@ Configurable { String[] commandArgs = command.split(" s+"); process = new ProcessBuilder(commandArgs).start(); reader = new BufferedReader( new InputStreamReader(process.getInputStream())); + new InputStreamReader(process.getInputStream(), charset)); // StderrLogger dies as soon as the input stream is invalid StderrReader stderrReader = new StderrReader(new BufferedReader( new InputStreamReader(process.getErrorStream())), logStderr); + new InputStreamReader(process.getErrorStream(), charset)), logStderr); stderrReader.setName("StderrReader- [" + command + "] "); stderrReader.setDaemon(true); stderrReader.start(); @@ -271,7 +278,7 @@ Configurable { List<Event> eventList = new ArrayList<Event>(); while ((line = reader.readLine()) != null) { counterGroup.incrementAndGet("exec.lines.read"); eventList.add(EventBuilder.withBody(line.getBytes())); + eventList.add(EventBuilder.withBody(line.getBytes(charset))); if(eventList.size() >= bufferCount) { channelProcessor.processEventBatch(eventList); eventList.clear(); @@ -340,6 +347,9 @@ Configurable { String line = null; while((line = input.readLine()) != null) { if(logStderr) { + // There is no need to read 'line' with a charset + // as we do not to propagate it. + // It is in UTF-16 and would be printed in UTF-8 format. logger.info("StderrLogger [{}] = '{}'", ++i, line); } } diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java index 0ba0508..2176d20 100644 a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java @@ -18,6 +18,8 @@ */ package org.apache.flume.source; +import java.nio.charset.Charset; + public class ExecSourceConfigurationConstants { /** @@ -43,4 +45,10 @@ public class ExecSourceConfigurationConstants { */ public static final String CONFIG_BATCH_SIZE = "batchSize"; public static final int DEFAULT_BATCH_SIZE = 20; + + /** + * Charset for reading input + */ + public static final String CHARSET = "charset"; + public static final String DEFAULT_CHARSET = "UTF-8"; }
          Hide
          nitin_matrix Nitin Verma added a comment -

          So there are two ways to deal these bytes
          1. Do not use String/Reader, that is deal with InputStream/byte[].
          2. Make String/Reader charset aware

          Show
          nitin_matrix Nitin Verma added a comment - So there are two ways to deal these bytes 1. Do not use String/Reader, that is deal with InputStream/byte[]. 2. Make String/Reader charset aware
          Hide
          nitin_matrix Nitin Verma added a comment -

          Hi Mike,

          InputStreamReader needs to know the charset else readLine just messes it up.

          bufferedReader = new BufferedReader(new InputStreamReader(byteArrayInputStream, charset));
          bufferedReader.readLine().getBytes(charset);

          package edu.nitin.testcodes;
          
          import java.io.BufferedReader;
          import java.io.ByteArrayInputStream;
          import java.io.IOException;
          import java.io.InputStreamReader;
          import java.nio.charset.Charset;
          import org.testng.annotations.Test;
          
          public class CharsetStreamTest {
          
              @Test
              public void testCharset() throws IOException {
                  final byte[] bytes = new byte[]{
                      (byte) 0x40, (byte) 0xC2, (byte) 0xE6, (byte) 0x40, (byte) '\n',
                      (byte) 0x41, (byte) 0xC2, (byte) 0xE6, (byte) 0x40, (byte) '\n',
                      (byte) 0x42, (byte) 0xC2, (byte) 0xE6, (byte) 0x40, (byte) '\n',
                      (byte) 0x43, (byte) 0xC2, (byte) 0xE6, (byte) 0x40, (byte) '\n',
                      (byte) 0x44, (byte) 0xC2, (byte) 0xE6, (byte) 0x40
                  };
          
                  final Charset charset = Charset.forName("ISO-8859-1");
                  System.out.println("Input bytes");
                  print(bytes);
          
                  System.out.println("ingest using charset");
                  {
                      final ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
          
                      final BufferedReader bufferedReader = new BufferedReader(
                              new InputStreamReader(byteArrayInputStream, charset));
                      String line;
                      while ((line = bufferedReader.readLine()) != null) {
                          print(line.getBytes(charset));
                      }
                  }
          
                  System.out.println("ingest without using charset");
                  {
                      final ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
          
                      final BufferedReader bufferedReader = new BufferedReader(
                              new InputStreamReader(byteArrayInputStream));
                      String line;
                      while ((line = bufferedReader.readLine()) != null) {
                          print(line.getBytes(charset));
                      }
                  }
          
              }
          
              private void print(final byte bytes[]) {
                  for (byte b : bytes) {
                      System.out.printf("  %02X", b);
                  }
                  System.out.println();
              }
          }
          
          Input bytes
            40  C2  E6  40  0A  41  C2  E6  40  0A  42  C2  E6  40  0A  43  C2  E6  40  0A  44  C2  E6  40
          ingest using charset
            40  C2  E6  40
            41  C2  E6  40
            42  C2  E6  40
            43  C2  E6  40
            44  C2  E6  40
          ingest without using charset
            40  3F  3F  40
            41  3F  3F  40
            42  3F  3F  40
            43  3F  3F  40
            44  3F  3F
          
          Show
          nitin_matrix Nitin Verma added a comment - Hi Mike, InputStreamReader needs to know the charset else readLine just messes it up. bufferedReader = new BufferedReader(new InputStreamReader(byteArrayInputStream, charset)); bufferedReader.readLine().getBytes(charset); package edu.nitin.testcodes; import java.io.BufferedReader; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStreamReader; import java.nio.charset.Charset; import org.testng.annotations.Test; public class CharsetStreamTest { @Test public void testCharset() throws IOException { final byte [] bytes = new byte []{ ( byte ) 0x40, ( byte ) 0xC2, ( byte ) 0xE6, ( byte ) 0x40, ( byte ) '\n', ( byte ) 0x41, ( byte ) 0xC2, ( byte ) 0xE6, ( byte ) 0x40, ( byte ) '\n', ( byte ) 0x42, ( byte ) 0xC2, ( byte ) 0xE6, ( byte ) 0x40, ( byte ) '\n', ( byte ) 0x43, ( byte ) 0xC2, ( byte ) 0xE6, ( byte ) 0x40, ( byte ) '\n', ( byte ) 0x44, ( byte ) 0xC2, ( byte ) 0xE6, ( byte ) 0x40 }; final Charset charset = Charset.forName( "ISO-8859-1" ); System .out.println( "Input bytes" ); print(bytes); System .out.println( "ingest using charset" ); { final ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes); final BufferedReader bufferedReader = new BufferedReader( new InputStreamReader(byteArrayInputStream, charset)); String line; while ((line = bufferedReader.readLine()) != null ) { print(line.getBytes(charset)); } } System .out.println( "ingest without using charset" ); { final ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes); final BufferedReader bufferedReader = new BufferedReader( new InputStreamReader(byteArrayInputStream)); String line; while ((line = bufferedReader.readLine()) != null ) { print(line.getBytes(charset)); } } } private void print( final byte bytes[]) { for ( byte b : bytes) { System .out.printf( " %02X" , b); } System .out.println(); } } Input bytes 40 C2 E6 40 0A 41 C2 E6 40 0A 42 C2 E6 40 0A 43 C2 E6 40 0A 44 C2 E6 40 ingest using charset 40 C2 E6 40 41 C2 E6 40 42 C2 E6 40 43 C2 E6 40 44 C2 E6 40 ingest without using charset 40 3F 3F 40 41 3F 3F 40 42 3F 3F 40 43 3F 3F 40 44 3F 3F
          Hide
          nitin_matrix Nitin Verma added a comment -

          Hi Mike,

          I did some testing on constructing java strings using iso-8859-1 bytes. As java string translates from given bytes to UTF-16, if charset is not correct then it is lossy. (default is UTF-8)

          For flume we should ingest and egest bytes from strings using the charset so that channel get the same bytes as user source had, likewise the sink.

          string = new String(bytes, charset);
          string.getBytes(charset);

          TODO: I would do similar tests on streams.

          Java Test Code

          package edu.nitin.testcodes;
          
          import java.nio.charset.Charset;
          import org.testng.annotations.Test;
          
          public class CharsetTest {
          
              @Test
              public void testCharset() {
                  final byte[] bytes = new byte[]{(byte) 0x40, (byte) 0xC2, (byte) 0xE6,(byte) 0x40};
                  final Charset charset = Charset.forName("ISO-8859-1");
                  System.out.println("Input bytes");
                  print(bytes);
          
                  System.out.println("ingest using charset");
                  {
                      final String string = new String(bytes, charset);
                      System.out.println(string);
                      print(string.getBytes());
                      print(string.getBytes(charset));
                  }
          
                  System.out.println("ingest without using charset");
                  {
                      final String string = new String(bytes);
                      System.out.println(string);
                      print(string.getBytes());
                      print(string.getBytes(charset));
                  }
          
              }
          
              private void print(final byte bytes[]) {
                  for (byte b : bytes) {
                      System.out.printf("  %02X", b);
                  }
                  System.out.println();
              }
          }
          
          

          Output

          Input bytes
            40  C2  E6  40
          ingest using charset
          @Âæ@
            40  C3  82  C3  A6  40
            40  C2  E6  40
          ingest without using charset
          @��
            40  EF  BF  BD  EF  BF  BD
            40  3F  3F
          
          Show
          nitin_matrix Nitin Verma added a comment - Hi Mike, I did some testing on constructing java strings using iso-8859-1 bytes. As java string translates from given bytes to UTF-16, if charset is not correct then it is lossy. (default is UTF-8) For flume we should ingest and egest bytes from strings using the charset so that channel get the same bytes as user source had, likewise the sink. string = new String(bytes, charset); string.getBytes(charset); TODO: I would do similar tests on streams. Java Test Code package edu.nitin.testcodes; import java.nio.charset.Charset; import org.testng.annotations.Test; public class CharsetTest { @Test public void testCharset() { final byte [] bytes = new byte []{( byte ) 0x40, ( byte ) 0xC2, ( byte ) 0xE6,( byte ) 0x40}; final Charset charset = Charset.forName( "ISO-8859-1" ); System .out.println( "Input bytes" ); print(bytes); System .out.println( "ingest using charset" ); { final String string = new String (bytes, charset); System .out.println(string); print(string.getBytes()); print(string.getBytes(charset)); } System .out.println( "ingest without using charset" ); { final String string = new String (bytes); System .out.println(string); print(string.getBytes()); print(string.getBytes(charset)); } } private void print( final byte bytes[]) { for ( byte b : bytes) { System .out.printf( " %02X" , b); } System .out.println(); } } Output Input bytes 40 C2 E6 40 ingest using charset @Âæ@ 40 C3 82 C3 A6 40 40 C2 E6 40 ingest without using charset @�� 40 EF BF BD EF BF BD 40 3F 3F
          Hide
          mpercy Mike Percy added a comment - - edited

          Nitin: That is the guarantee Flume provides as a framework. I believe the request is the following:

          1. Provide a way to specify the charset that is provided on the terminal to Flume, so that the Exec Source knows how to decode it into a String.
          2. Provide a way to specify the charset we will store in the Flume Event object itself, when the Exec Source encodes the String into binary form using EventBuilder.

          Without the capability to specify these encodings, a user doesn't have enough control over how the Exec Source interprets his text input data.

          (Edit: clarifications)

          Show
          mpercy Mike Percy added a comment - - edited Nitin: That is the guarantee Flume provides as a framework. I believe the request is the following: 1. Provide a way to specify the charset that is provided on the terminal to Flume, so that the Exec Source knows how to decode it into a String. 2. Provide a way to specify the charset we will store in the Flume Event object itself, when the Exec Source encodes the String into binary form using EventBuilder. Without the capability to specify these encodings, a user doesn't have enough control over how the Exec Source interprets his text input data. (Edit: clarifications)
          Hide
          nitin_matrix Nitin Verma added a comment -

          There are few questions around this request.

          Before that I would like to explain a bit about two charsets under consideration.

          Suppose we need to write a²=¼b in ISO-8859-1 (http://en.wikipedia.org/wiki/ISO/IEC_8859-1).
          1. a,b,= fall in ASCII range, thus you can type
          2. ² = B2, ¼ = BC in hex.

          $ awk ' BEGIN

          { printf "a%s=%sb\n", "\xB2", "\xBC" } '
          a�=�b

          Note: If this shows up as a²=¼b, then you are on ISO-8859-1.

          Now let us encode the same in UTF-8 (http://en.wikipedia.org/wiki/UTF-8)

          Char. number range | UTF-8 octet sequence
          (hexadecimal) | (binary)
          -------------------+--------------------------------------------
          0000 0000-0000 007F | 0xxxxxxx
          0000 0080-0000 07FF | 110xxxxx 10xxxxxx
          0000 0800-0000 FFFF | 1110xxxx 10xxxxxx 10xxxxxx
          0001 0000-0010 FFFF | 11110xxx 10xxxxxx 10xxxxxx 10xxxxxx
          and so on so forth

          The hex values for the chars is same in UTF-8 but it has to be encoded it is not a single byte charset (² = B2, ¼ = BC )

          As B2 & BC > 7F and < 0800, it would be encoded in two bytes (110xxxxx 10xxxxxx)
          B2 => 1011 0010 => 1100 0010 1011 0010 => C2 B2
          B2 => 1011 1100 => 1100 0010 1011 1100 => C2 BC

          $ awk ' BEGIN { printf "a%s=%sb\n", "\xC2\xB2", "\xC2\xBC" } '
          a²=¼b

          Note: If this shows up as a²=¼b, then you are on ISO-8859-1.

          iconv tries to makes sure it translates bytes in such a way that from-charset is visible on to-charset terminal.

          Thus it would add C2, if I do the following.

          $ awk ' BEGIN { printf "a%s=%sbn", "xB2", "xBC" }

          ' | iconv -f "ISO-8859-1" -t "UTF-8"
          a²=¼b

          Warning:
          There are many charsets around and not all charsets support all the characters. Thereby Byte translation is a lossy business. Example below:-
          $ awk ' BEGIN

          { print "\xE0\xA5\x90" }

          ' | iconv -f "UTF-8" -t "ISO-8859-1"
          iconv: illegal input sequence at position 0

          Considering all above, I feel

          Flume should concentrate on transferring byte to byte from one system to another, not translating. If the charset of two systems is different, then
          source system: cat $file
          sink system: cat $file | iconv -f source-charset -t sink-charset
          should show the same visible output, till sink-charset defines all the characters defined in source-charset.

            • Only guarantee flume should give is bytes transferred on sink are the same as the bytes given via the source **
          Show
          nitin_matrix Nitin Verma added a comment - There are few questions around this request. Before that I would like to explain a bit about two charsets under consideration. Suppose we need to write a²=¼b in ISO-8859-1 ( http://en.wikipedia.org/wiki/ISO/IEC_8859-1 ). 1. a,b,= fall in ASCII range, thus you can type 2. ² = B2, ¼ = BC in hex. $ awk ' BEGIN { printf "a%s=%sb\n", "\xB2", "\xBC" } ' a�=�b Note: If this shows up as a²=¼b, then you are on ISO-8859-1. Now let us encode the same in UTF-8 ( http://en.wikipedia.org/wiki/UTF-8 ) Char. number range | UTF-8 octet sequence (hexadecimal) | (binary) ------------------- + -------------------------------------------- 0000 0000-0000 007F | 0xxxxxxx 0000 0080-0000 07FF | 110xxxxx 10xxxxxx 0000 0800-0000 FFFF | 1110xxxx 10xxxxxx 10xxxxxx 0001 0000-0010 FFFF | 11110xxx 10xxxxxx 10xxxxxx 10xxxxxx and so on so forth The hex values for the chars is same in UTF-8 but it has to be encoded it is not a single byte charset (² = B2, ¼ = BC ) As B2 & BC > 7F and < 0800, it would be encoded in two bytes (110xxxxx 10xxxxxx) B2 => 1011 0010 => 1100 0010 1011 0010 => C2 B2 B2 => 1011 1100 => 1100 0010 1011 1100 => C2 BC $ awk ' BEGIN { printf "a%s=%sb\n", "\xC2\xB2", "\xC2\xBC" } ' a²=¼b Note: If this shows up as a²=¼b, then you are on ISO-8859-1. iconv tries to makes sure it translates bytes in such a way that from-charset is visible on to-charset terminal. Thus it would add C2, if I do the following. $ awk ' BEGIN { printf "a%s=%sbn", "xB2", "xBC" } ' | iconv -f "ISO-8859-1" -t "UTF-8" a²=¼b Warning: There are many charsets around and not all charsets support all the characters. Thereby Byte translation is a lossy business. Example below:- $ awk ' BEGIN { print "\xE0\xA5\x90" } ' | iconv -f "UTF-8" -t "ISO-8859-1" iconv: illegal input sequence at position 0 Considering all above, I feel Flume should concentrate on transferring byte to byte from one system to another, not translating. If the charset of two systems is different, then source system: cat $file sink system: cat $file | iconv -f source-charset -t sink-charset should show the same visible output, till sink-charset defines all the characters defined in source-charset. Only guarantee flume should give is bytes transferred on sink are the same as the bytes given via the source **
          Hide
          saggar Suresh Saggar added a comment -

          I was trying to setup a FLumeNG multi-tier workflow with agent01 running on some webserver using exec source & avro sink and other agent02 running on some webserver (a collector) using avro source and hdfs sink.

          Configuration file - https://gist.github.com/3993648

          Although the data (here tail output) was getting written to hdfs, but when i cat the file I can see some formatting issues. Link to the HDFS output depicting formatting issue - https://gist.github.com/3995476

          Show
          saggar Suresh Saggar added a comment - I was trying to setup a FLumeNG multi-tier workflow with agent01 running on some webserver using exec source & avro sink and other agent02 running on some webserver (a collector) using avro source and hdfs sink. Configuration file - https://gist.github.com/3993648 Although the data (here tail output) was getting written to hdfs, but when i cat the file I can see some formatting issues. Link to the HDFS output depicting formatting issue - https://gist.github.com/3995476
          Hide
          mpercy Mike Percy added a comment -

          Talked to Suresh about this on IRC. Example would be using exec source with a tail -F command on a file that is ISO-8859 encoded.

          Show
          mpercy Mike Percy added a comment - Talked to Suresh about this on IRC. Example would be using exec source with a tail -F command on a file that is ISO-8859 encoded.

            People

            • Assignee:
              nitin_matrix Nitin Verma
              Reporter:
              saggar Suresh Saggar
            • Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development