Uploaded image for project: 'Apache NiFi'
  1. Apache NiFi
  2. NIFI-1432

Efficient line by line csv processor



    • Type: New Feature
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 0.4.1
    • Fix Version/s: None
    • Component/s: Core Framework
    • Environment:
      Redhat 6.4 (X64)
      Cloudera Hadoop 5.4.2
      NiFi 0.4.1



      I was planning to design ETL flow for Hadoop. While doin that I feel the need of an efficient line by line csv processor.
      Plese check below for details:

      1. Source is plain ASCII files(csv with row & columns). It is comma seperated and some of the columns are double quoted(")
      2. Files are being pushed to a local directory of a machine where NiFi installed
      3. We want to manipulate some of the columns(like masking) before we load data in HDFS. Bunisedd requirement is anything loading in Hadoop should be masked.
      4. There will be 5-6 TB data per day and each files size will be 1-2GB in size

      Implemented Solution:
      With the above requirements in mind we have designed below flow on NiFi:
      ExecuteProcess(touch and mv to input dir) > ListFile (1 thread) > FetchFile (1 thread) > CSVProcessor(4 threads) > PutHDFS (1 thread)

      • "CSVProcessor" is a custom processor. It uses opencsv to parse csv and identify columns.
      • I have added some business logic in "CSVProcessor", like masking specific columns
      • used 4 threads for "CSVProcessor" and 1 for other because I found it is the slowest component.

      1. With the flow above, I was able to load 110GB files in 90 minutes.
      2. CSVProcessor with single thread can process 1GB files in about 4 minutes. Which is really slow. Need some improvement here.

      In order to check slowness with CSVProcessor we followed below steps:
      1. Initially we tried above flow with below default heap size (in file conf/bootstrap.conf)

      With this configuration we check below:

      • "jstat -gcutil <PID of NiFI found from jps> 1000"
      • "iostat xmh 1"
        [check attached iostat.txt & jstat.txt]
        We have found garbage collection process is slower due to undersized java heap. CPU & I/O have no issues.

      2. Then we heap size as below:

      And check output of jstat & iostat again.This time no problem found on heap size, I/O or CPU. [check attached iostat2.txt & jstat2.txt]
      However, still this processor (CSVProcessor) is slow as usual. Almost no improvement on slowness.

      For details on it please go through Nifi users mail list mail: http://apache-nifi.1125220.n5.nabble.com/Data-Ingestion-forLarge-Source-Files-and-Masking-td2535.html


      Proposing for a faster csv processor with below requirements:
      a. It can read any ASCII/UTF-8 csv files and identify columns
      b. It should be typically able to parse process at least 2GB in a minute
      c. There should be pluggable functionality. Example I want to mask some specific column in position [0,5 & 7] with my already built jar makser.jar.
      d. You are wellcome to reuse/modify my code [github link above]
      e. In order to make it faster we may use some in memory batch processing. Example: we load each file in some in memory storage and batch update on specific columns to meet he business requirements.



        1. sample2g.csv.tar.gz
          9.91 MB
          Obaidul Karim
        2. jstat2.txt
          2 kB
          Obaidul Karim
        3. jstat.txt
          3 kB
          Obaidul Karim
        4. iostat2.txt
          3 kB
          Obaidul Karim
        5. iostat.txt
          6 kB
          Obaidul Karim
        6. CSVProcessor_to_HDFS.xml
          35 kB
          Obaidul Karim



            • Assignee:
              obaid Obaidul Karim
            • Votes:
              0 Vote for this issue
              3 Start watching this issue


              • Created:

                Time Tracking

                Original Estimate - 672h
                Remaining Estimate - 672h
                Time Spent - Not Specified
                Not Specified