Uploaded image for project: 'Apache NiFi'
  1. Apache NiFi
  2. NIFI-1064

Create a blog for how to pull files, decompress, prepare, drive into HDFS



    • Type: Task
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Labels:



      So, I stumbled onto Nifi at a Laurel, MD Spark meetup and was pretty
      excited about using it. I'm running HDP and need to construct an ETL
      like flow and would like to try to start, as a new user to Nifi, using
      a "best practice" approach. Wondering if some of you more seasoned
      users might provide some thoughts on my problem?

      1. 160 zip files/day show up on an NFS share in various sub
      directories and their filenames contain the yyyymmddHHMMSS of when the
      stats where generated.
      2. Each zip file contains 4 or more large CSV files
      3. I need just one of those CSVs from each zip file each day and they
      all add up to about 10GB uncompressed
      4. I need to extract that one file from each zip, strip off the first
      line (the headers), and store it in HDFS compressed again using gzip
      or snappy
      5. I cannot delete the NFS file after the copy to HDFS because others
      need access to it for some time

      So, where I am having a hard time visualizing doing this in Nifi is
      with the first step. I need to scan the NFS files after 8 AM every day
      (when I know all files for the previous 24 hours will be present),
      find that set of files for that day using the yyymmdd part of file
      names, then perform the extract of the one file I need and process it
      into HDFS.

      I could imagine a processor that runs once every 24 hours on a cron
      schedule. I could imaging running an ExecuteProcess processor against
      a bash script to get the list of all the files that match the
      yyyymmdd. Then I get stuck. How to take this list of 160 file paths
      and start the job of processing each one of them in parallel to run
      the ETL flow?

      Thanks in advance for any ideas

      Lee Laim
      2:27 AM (9 hours ago)

      to users

      I'm far from seasoned but I'll take a swing at it to check my understanding (or lack thereof).
      I'd break the task into 2 parts:

      Identify and move files to a staging location, then process the zip files from the staging location.

      Run a cron driven GenerateFlowFile Processor to start the process every 24 hours after 8AM ->

      ExecuteStreamCommand to run your bash script to stream the the 160 files of interest into ->

      SplitText processor to generate a new flow file for each zip filename. This can be routed into a

      DistributeLoad processor which can will distribute the flowfiles to

      ExtractText processors to extract the text out of the flowfile (extract contents: filename and path), then pass to

      UpdateAttribute of the flow file to be able to access the filename and path via Nifi expression language. Pass flow file to

      ExecuteStreamProcess(cp /${path_attribute}/${filename} /location2/${filename} ) this will copy the zipfile to a another directory(location2), to keep files at the source for other users.

      GetFile from location2 -> Unpack Contents -> RouteOnAttribute (to select CSV of interest, discard the rest) -> (ExecuteStreamProcess (sed '1d') to remove header -> CompressContent -> PutHDFS

      Hope this helps, and I hope this isn't too far off.


      Mark Payne via nifi.apache.org
      9:45 AM (1 hour ago)

      to users
      Hey Mark,

      Thanks for sharing your use case with us in pretty good details so that we can understand
      what you're trying to do here.

      There are actually a few processors coming in the next release that I think should help here.
      First, there's the FetchFile processor that you noticed in NIFI-631. Hopefully the ListFile will
      make its way in there as well because it's much easier that way In either case, you can right-click
      on the Processor and click Configure. If you go to the Scheduling tab, you can change the Scheduling
      Strategy to CRON-Driven and set the schedule to run whenever you'd like.

      As-is, the GetFile is expected
      to remove the file from the current location, as the idea was that NiFi would sort of assume
      ownership of the file. It turns out that in the Open Source world, that's often not desirable, so
      we are moving more toward the List/Fetch pattern as described in that ticket.

      Once you pull the files into NiFI, though, UnpackContent should unzip the files, each into its
      own FlowFile. You could then use a RouteOnAttribute to pull out just the file that you care about,
      based on its filename. You can then allow the others to be routed to Unmatched and auto-terminate
      them from the flow.

      Stripping off the first line could probably be done using the ReplaceText, but in the next version
      of NiFi, we will have a RouteText processor that should make working with CSV's far easier. You could,
      for instance, route any line that begins with # to one relationship and the rest to a second relationship.
      This effectively allows you to filter out the header line.

      Finally, you can use PutHDFS and set the Compression Codec to whatever you prefer. GZIP, Snappy, etc.
      Prior to that, if you need to, you could also add in a MergeContent processor in order to concatenate
      together these CSV files in order to make them larger.




      Thanks for jumping in Lee!


      This is a great writeup. We should turn this into a blog w/full explanation and template. Great use case and you just gave us a perfect user perspective/explanation of how you're thinking of it.

      We will make that happen quickly.





            • Assignee:
              joewitt Joe Witt
            • Votes:
              0 Vote for this issue
              3 Start watching this issue


              • Created: