Details

    • Type: New Feature
    • Status: Patch Available
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: 1.4.0
    • Component/s: Extensions
    • Labels:
      None

      Description

      We need a processor that is capable of validating that all Records in a FlowFile adhere to the proper schema.

      The Processor should be configured with a Record Reader and should route each record to either 'valid' or 'invalid' based on whether or not the record adheres to the reader's schema. A record would be invalid in any of the following cases:

      • Missing field that is required according to the schema
      • Extra field that is not present in schema (it should be configurable whether or not this is a failure)
      • Field requires coercion and strict type checking enabled (this should also be configurable)
      • Field is invalid, such as the value "hello" when it should be an integer

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user markap14 opened a pull request:

          https://github.com/apache/nifi/pull/2015

          NIFI-4142: Refactored Record Reader/Writer to allow for reading/writi…

          …ng "raw records". Implemented ValidateRecord.

          Thank you for submitting a contribution to Apache NiFi.

          In order to streamline the review of the contribution we ask you
          to ensure the following steps have been taken:

              1. For all changes:
          • [ ] Is there a JIRA ticket associated with this PR? Is it referenced
            in the commit message?
          • [ ] Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
          • [ ] Has your PR been rebased against the latest commit within the target branch (typically master)?
          • [ ] Is your initial contribution a single, squashed commit?
              1. For code changes:
          • [ ] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder?
          • [ ] Have you written or updated unit tests to verify your changes?
          • [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
          • [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly?
          • [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly?
          • [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties?
              1. For documentation related changes:
          • [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
              1. Note:
                Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/markap14/nifi NIFI-4142

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/nifi/pull/2015.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2015


          commit ad5c46fe7646103021556178726254f2cbb0b8a0
          Author: Mark Payne <markap14@hotmail.com>
          Date: 2017-06-30T12:32:01Z

          NIFI-4142: Refactored Record Reader/Writer to allow for reading/writing "raw records". Implemented ValidateRecord.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user markap14 opened a pull request: https://github.com/apache/nifi/pull/2015 NIFI-4142 : Refactored Record Reader/Writer to allow for reading/writi… …ng "raw records". Implemented ValidateRecord. Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: For all changes: [ ] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? [ ] Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. [ ] Has your PR been rebased against the latest commit within the target branch (typically master)? [ ] Is your initial contribution a single, squashed commit? For code changes: [ ] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? [ ] Have you written or updated unit tests to verify your changes? [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0] ( http://www.apache.org/legal/resolved.html#category-a)? [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? For documentation related changes: [ ] Have you ensured that format looks appropriate for the output in which it is rendered? Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/markap14/nifi NIFI-4142 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/2015.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2015 commit ad5c46fe7646103021556178726254f2cbb0b8a0 Author: Mark Payne <markap14@hotmail.com> Date: 2017-06-30T12:32:01Z NIFI-4142 : Refactored Record Reader/Writer to allow for reading/writing "raw records". Implemented ValidateRecord.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user joewitt commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/2015#discussion_r127738279

          — Diff: nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SchemaValidationException.java —
          @@ -15,14 +15,16 @@

          • limitations under the License.
            */

          -package org.apache.nifi.serialization.record;
          +package org.apache.nifi.serialization;

          -public class TypeMismatchException extends RuntimeException {
          — End diff –

          i dont think it is ok to change this exeception class name at this juncture and even if it is questionable ok the juice is probably not worth the squeeze. TypeMismatch and SchemaValidation are pretty much the same thing

          Show
          githubbot ASF GitHub Bot added a comment - Github user joewitt commented on a diff in the pull request: https://github.com/apache/nifi/pull/2015#discussion_r127738279 — Diff: nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SchemaValidationException.java — @@ -15,14 +15,16 @@ limitations under the License. */ -package org.apache.nifi.serialization.record; +package org.apache.nifi.serialization; -public class TypeMismatchException extends RuntimeException { — End diff – i dont think it is ok to change this exeception class name at this juncture and even if it is questionable ok the juice is probably not worth the squeeze. TypeMismatch and SchemaValidation are pretty much the same thing
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user joewitt commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/2015#discussion_r127738787

          — Diff: nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordReader.java —
          @@ -38,14 +38,35 @@
          public interface RecordReader extends Closeable {

          /**

          • * Returns the next record in the stream or <code>null</code> if no more records are available.
            + * Returns the next record in the stream or <code>null</code> if no more records are available. Schema enforcement will be enabled.
            *
          • @return the next record in the stream or <code>null</code> if no more records are available.
            *
          • @throws IOException if unable to read from the underlying data
          • @throws MalformedRecordException if an unrecoverable failure occurs when trying to parse a record
            + * @throws SchemaValidationException if a Record contains a field that violates the schema and cannot be coerced into the appropriate field type.
            */
          • Record nextRecord() throws IOException, MalformedRecordException;
            + default Record nextRecord() throws IOException, MalformedRecordException { + return nextRecord(true); + }

            +
            + /**
            + * Reads the next record from the underlying stream. If schema enforcement is enabled, then any field in the Record whose type does not
            + * match the schema will be coerced to the correct type and a MalformedRecordException will be thrown if unable to coerce the data into
            + * the correct type. If schema enforcement is disabled, then no type coercion will occur. As a result, calling
            + *

            {@link Record#getValue(org.apache.nifi.serialization.record.RecordField)}

            + * may return any type of Object, such as a String or another Record, even though the schema indicates that the field must be an integer.
            + *
            + * @param enforceSchema whether or not fields in the Record should be validated against the schema and coerced when necessary
            + *
            + * @return the next record in the stream or <code>null</code> if no more records are available
            + * @throws IOException if unable to read from the underlying data
            + * @throws MalformedRecordException if an unrecoverable failure occurs when trying to parse a record, or a Record contains a field
            + * that violates the schema and cannot be coerced into the appropriate field type.
            + * @throws SchemaValidationException if a Record contains a field that violates the schema and cannot be coerced into the appropriate
            + * field type and schema enforcement is enabled
            + */
            + Record nextRecord(boolean enforceSchema) throws IOException, MalformedRecordException;

              • End diff –

          the schema had always been enforced arguably just with sense of leniency. I think this method parameter should be 'strictSchemaEnforcement' or 'enforceStrictSchema'.

          Show
          githubbot ASF GitHub Bot added a comment - Github user joewitt commented on a diff in the pull request: https://github.com/apache/nifi/pull/2015#discussion_r127738787 — Diff: nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordReader.java — @@ -38,14 +38,35 @@ public interface RecordReader extends Closeable { /** * Returns the next record in the stream or <code>null</code> if no more records are available. + * Returns the next record in the stream or <code>null</code> if no more records are available. Schema enforcement will be enabled. * @return the next record in the stream or <code>null</code> if no more records are available. * @throws IOException if unable to read from the underlying data @throws MalformedRecordException if an unrecoverable failure occurs when trying to parse a record + * @throws SchemaValidationException if a Record contains a field that violates the schema and cannot be coerced into the appropriate field type. */ Record nextRecord() throws IOException, MalformedRecordException; + default Record nextRecord() throws IOException, MalformedRecordException { + return nextRecord(true); + } + + /** + * Reads the next record from the underlying stream. If schema enforcement is enabled, then any field in the Record whose type does not + * match the schema will be coerced to the correct type and a MalformedRecordException will be thrown if unable to coerce the data into + * the correct type. If schema enforcement is disabled, then no type coercion will occur. As a result, calling + * {@link Record#getValue(org.apache.nifi.serialization.record.RecordField)} + * may return any type of Object, such as a String or another Record, even though the schema indicates that the field must be an integer. + * + * @param enforceSchema whether or not fields in the Record should be validated against the schema and coerced when necessary + * + * @return the next record in the stream or <code>null</code> if no more records are available + * @throws IOException if unable to read from the underlying data + * @throws MalformedRecordException if an unrecoverable failure occurs when trying to parse a record, or a Record contains a field + * that violates the schema and cannot be coerced into the appropriate field type. + * @throws SchemaValidationException if a Record contains a field that violates the schema and cannot be coerced into the appropriate + * field type and schema enforcement is enabled + */ + Record nextRecord(boolean enforceSchema) throws IOException, MalformedRecordException; End diff – the schema had always been enforced arguably just with sense of leniency. I think this method parameter should be 'strictSchemaEnforcement' or 'enforceStrictSchema'.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user joewitt commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/2015#discussion_r127738912

          — Diff: nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordReader.java —
          @@ -38,14 +38,35 @@
          public interface RecordReader extends Closeable {

          /**

          • * Returns the next record in the stream or <code>null</code> if no more records are available.
            + * Returns the next record in the stream or <code>null</code> if no more records are available. Schema enforcement will be enabled.
            *
          • @return the next record in the stream or <code>null</code> if no more records are available.
            *
          • @throws IOException if unable to read from the underlying data
          • @throws MalformedRecordException if an unrecoverable failure occurs when trying to parse a record
            + * @throws SchemaValidationException if a Record contains a field that violates the schema and cannot be coerced into the appropriate field type.
            */
          • Record nextRecord() throws IOException, MalformedRecordException;
            + default Record nextRecord() throws IOException, MalformedRecordException {
              • End diff –

          we should indicate whether the scheme enforcement strictness is 'lenient' or 'strict'.

          Show
          githubbot ASF GitHub Bot added a comment - Github user joewitt commented on a diff in the pull request: https://github.com/apache/nifi/pull/2015#discussion_r127738912 — Diff: nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordReader.java — @@ -38,14 +38,35 @@ public interface RecordReader extends Closeable { /** * Returns the next record in the stream or <code>null</code> if no more records are available. + * Returns the next record in the stream or <code>null</code> if no more records are available. Schema enforcement will be enabled. * @return the next record in the stream or <code>null</code> if no more records are available. * @throws IOException if unable to read from the underlying data @throws MalformedRecordException if an unrecoverable failure occurs when trying to parse a record + * @throws SchemaValidationException if a Record contains a field that violates the schema and cannot be coerced into the appropriate field type. */ Record nextRecord() throws IOException, MalformedRecordException; + default Record nextRecord() throws IOException, MalformedRecordException { End diff – we should indicate whether the scheme enforcement strictness is 'lenient' or 'strict'.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user markap14 commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/2015#discussion_r128764060

          — Diff: nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SchemaValidationException.java —
          @@ -15,14 +15,16 @@

          • limitations under the License.
            */

          -package org.apache.nifi.serialization.record;
          +package org.apache.nifi.serialization;

          -public class TypeMismatchException extends RuntimeException {
          — End diff –

          I don't agree that they are pretty much the same thing. TypeMismatchException is very specific. SchemaValidationException can be much more broad. For instance, if a required field is missing, that is not a Type Mismatch, but it is a Schema Validation.

          Show
          githubbot ASF GitHub Bot added a comment - Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2015#discussion_r128764060 — Diff: nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SchemaValidationException.java — @@ -15,14 +15,16 @@ limitations under the License. */ -package org.apache.nifi.serialization.record; +package org.apache.nifi.serialization; -public class TypeMismatchException extends RuntimeException { — End diff – I don't agree that they are pretty much the same thing. TypeMismatchException is very specific. SchemaValidationException can be much more broad. For instance, if a required field is missing, that is not a Type Mismatch, but it is a Schema Validation.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user markap14 commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/2015#discussion_r128806560

          — Diff: nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordReader.java —
          @@ -38,14 +38,35 @@
          public interface RecordReader extends Closeable {

          /**

          • * Returns the next record in the stream or <code>null</code> if no more records are available.
            + * Returns the next record in the stream or <code>null</code> if no more records are available. Schema enforcement will be enabled.
            *
          • @return the next record in the stream or <code>null</code> if no more records are available.
            *
          • @throws IOException if unable to read from the underlying data
          • @throws MalformedRecordException if an unrecoverable failure occurs when trying to parse a record
            + * @throws SchemaValidationException if a Record contains a field that violates the schema and cannot be coerced into the appropriate field type.
            */
          • Record nextRecord() throws IOException, MalformedRecordException;
            + default Record nextRecord() throws IOException, MalformedRecordException { + return nextRecord(true); + }

            +
            + /**
            + * Reads the next record from the underlying stream. If schema enforcement is enabled, then any field in the Record whose type does not
            + * match the schema will be coerced to the correct type and a MalformedRecordException will be thrown if unable to coerce the data into
            + * the correct type. If schema enforcement is disabled, then no type coercion will occur. As a result, calling
            + *

            {@link Record#getValue(org.apache.nifi.serialization.record.RecordField)}

            + * may return any type of Object, such as a String or another Record, even though the schema indicates that the field must be an integer.
            + *
            + * @param enforceSchema whether or not fields in the Record should be validated against the schema and coerced when necessary
            + *
            + * @return the next record in the stream or <code>null</code> if no more records are available
            + * @throws IOException if unable to read from the underlying data
            + * @throws MalformedRecordException if an unrecoverable failure occurs when trying to parse a record, or a Record contains a field
            + * that violates the schema and cannot be coerced into the appropriate field type.
            + * @throws SchemaValidationException if a Record contains a field that violates the schema and cannot be coerced into the appropriate
            + * field type and schema enforcement is enabled
            + */
            + Record nextRecord(boolean enforceSchema) throws IOException, MalformedRecordException;

              • End diff –

          I think I actually want to just separate the concept out into two different variables here: boolean coerceTypes, boolean dropUnknownRecords. That way it is very explicit what is happening, and I don't think that 'strict' vs. 'lenient' really conveys those two semantics as well as I'd like.

          Show
          githubbot ASF GitHub Bot added a comment - Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2015#discussion_r128806560 — Diff: nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordReader.java — @@ -38,14 +38,35 @@ public interface RecordReader extends Closeable { /** * Returns the next record in the stream or <code>null</code> if no more records are available. + * Returns the next record in the stream or <code>null</code> if no more records are available. Schema enforcement will be enabled. * @return the next record in the stream or <code>null</code> if no more records are available. * @throws IOException if unable to read from the underlying data @throws MalformedRecordException if an unrecoverable failure occurs when trying to parse a record + * @throws SchemaValidationException if a Record contains a field that violates the schema and cannot be coerced into the appropriate field type. */ Record nextRecord() throws IOException, MalformedRecordException; + default Record nextRecord() throws IOException, MalformedRecordException { + return nextRecord(true); + } + + /** + * Reads the next record from the underlying stream. If schema enforcement is enabled, then any field in the Record whose type does not + * match the schema will be coerced to the correct type and a MalformedRecordException will be thrown if unable to coerce the data into + * the correct type. If schema enforcement is disabled, then no type coercion will occur. As a result, calling + * {@link Record#getValue(org.apache.nifi.serialization.record.RecordField)} + * may return any type of Object, such as a String or another Record, even though the schema indicates that the field must be an integer. + * + * @param enforceSchema whether or not fields in the Record should be validated against the schema and coerced when necessary + * + * @return the next record in the stream or <code>null</code> if no more records are available + * @throws IOException if unable to read from the underlying data + * @throws MalformedRecordException if an unrecoverable failure occurs when trying to parse a record, or a Record contains a field + * that violates the schema and cannot be coerced into the appropriate field type. + * @throws SchemaValidationException if a Record contains a field that violates the schema and cannot be coerced into the appropriate + * field type and schema enforcement is enabled + */ + Record nextRecord(boolean enforceSchema) throws IOException, MalformedRecordException; End diff – I think I actually want to just separate the concept out into two different variables here: boolean coerceTypes, boolean dropUnknownRecords. That way it is very explicit what is happening, and I don't think that 'strict' vs. 'lenient' really conveys those two semantics as well as I'd like.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user markap14 commented on the issue:

          https://github.com/apache/nifi/pull/2015

          @joewitt I've pushed a new commit that I believe better clarifies how schemas are treated in terms of strictness vs. leniency by providing two arguments instead of 'enforceSchema': 'coerceTypes' and 'dropUnknownFields'

          Show
          githubbot ASF GitHub Bot added a comment - Github user markap14 commented on the issue: https://github.com/apache/nifi/pull/2015 @joewitt I've pushed a new commit that I believe better clarifies how schemas are treated in terms of strictness vs. leniency by providing two arguments instead of 'enforceSchema': 'coerceTypes' and 'dropUnknownFields'

            People

            • Assignee:
              markap14 Mark Payne
              Reporter:
              markap14 Mark Payne
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:

                Development