Uploaded image for project: 'Apache NiFi'
  1. Apache NiFi
  2. NIFI-5064

Fixes and improvements to PutKudu processor

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 1.6.0
    • 1.7.0
    • None
    • None

    Description

      1. Currently, PutKudu fails with NPE on null or missing values.

      2. IllegalArgumentException on 16-bit integer columns because of a missing break in case clause for INT16 columns.

      3. Also, IllegalArgumentException on 8-bit integer columns. We need a separate case clause for INT8 columns where PartialRow#addByte instead of PartialRow#addShort is be used.

      4. NIFI-4384 added batch size parameter, however, it only applies to FlowFiles with multiple records. KuduSession is created and closed for each FlowFile, so if a FlowFile contains only a single record, no batching takes place. A workaround would be to use a preprocessor to concatenate multiple FlowFiles, but since PutHBase and PutSQL use session.get(batchSize) to handle multiple FlowFiles at once, I think we can take the same approach here with PutKudu as it simplifies the data flow.

      5. PutKudu depends on kudu-client 1.3.0. But we can safely update to 1.7.0.

      A notable change in Kudu 1.7.0 is the addition of Decimal type.

      6. PutKudu has Skip head line property for ignoring the first record in a FlowFile. I suppose this was added to handle header lines in CSV files, but I really don't think it's something PutKudu should handle. CSVReader already has Treat First Line as Header option, so we should tell the users to use it instead as we don't want to have the same option here and there. Also, the default value of Skip head line is true, and I found it very confusing as my use case was to stream-process single-record FlowFiles. We can keep this property for backward compatibility, but we should at least deprecate it and change the default value to false.

      7. Server-side errors such as uniqueness constraint violation are not checked and simply ignored. When flush mode is set to AUTO_FLUSH_SYNC, we should check the return value of KuduSession#apply to see it has RowError, but PutKudu currently ignores it. For example, on uniqueness constraint violation, we get a RowError saying "Already present: key already present (error 0)".

      On the other hand, when flush mode is set to AUTO_FLUSH_BACKGROUND, KuduSession#apply, understandably, returns null, and we should check the return value of KuduSession#getPendingErrors(). And when the mode is MANUAL_FLUSH, we should examine the return value of KuduSession#flush() or KuduSession#close(). In this case, we also have to make sure that we don't overflow the mutation buffer of KuduSession by calling flush() before too late.


      I'll create a pull request on GitHub. Since there are multiple issues to be addressed, I made separate commits for each issue mentioned above so that it's easier to review. You might want to squash them into one, or cherry-pick a subset of commits if you don't agree with some decisions I made.

      Please let me know what you think. We deployed the code to a production server last week and it's been running since without any issues steadily processing 20K records/second.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              junegunn Junegunn Choi
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: