Details

    • Type: New Feature
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.4.0
    • Component/s: Extensions
    • Labels:
      None

      Description

      We need a processor that is capable of validating that all Records in a FlowFile adhere to the proper schema.

      The Processor should be configured with a Record Reader and should route each record to either 'valid' or 'invalid' based on whether or not the record adheres to the reader's schema. A record would be invalid in any of the following cases:

      • Missing field that is required according to the schema
      • Extra field that is not present in schema (it should be configurable whether or not this is a failure)
      • Field requires coercion and strict type checking enabled (this should also be configurable)
      • Field is invalid, such as the value "hello" when it should be an integer

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user markap14 opened a pull request:

          https://github.com/apache/nifi/pull/2015

          NIFI-4142: Refactored Record Reader/Writer to allow for reading/writi…

          …ng "raw records". Implemented ValidateRecord.

          Thank you for submitting a contribution to Apache NiFi.

          In order to streamline the review of the contribution we ask you
          to ensure the following steps have been taken:

              1. For all changes:
          • [ ] Is there a JIRA ticket associated with this PR? Is it referenced
            in the commit message?
          • [ ] Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
          • [ ] Has your PR been rebased against the latest commit within the target branch (typically master)?
          • [ ] Is your initial contribution a single, squashed commit?
              1. For code changes:
          • [ ] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder?
          • [ ] Have you written or updated unit tests to verify your changes?
          • [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
          • [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly?
          • [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly?
          • [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties?
              1. For documentation related changes:
          • [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
              1. Note:
                Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/markap14/nifi NIFI-4142

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/nifi/pull/2015.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2015


          commit ad5c46fe7646103021556178726254f2cbb0b8a0
          Author: Mark Payne <markap14@hotmail.com>
          Date: 2017-06-30T12:32:01Z

          NIFI-4142: Refactored Record Reader/Writer to allow for reading/writing "raw records". Implemented ValidateRecord.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user markap14 opened a pull request: https://github.com/apache/nifi/pull/2015 NIFI-4142 : Refactored Record Reader/Writer to allow for reading/writi… …ng "raw records". Implemented ValidateRecord. Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: For all changes: [ ] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? [ ] Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. [ ] Has your PR been rebased against the latest commit within the target branch (typically master)? [ ] Is your initial contribution a single, squashed commit? For code changes: [ ] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? [ ] Have you written or updated unit tests to verify your changes? [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0] ( http://www.apache.org/legal/resolved.html#category-a)? [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? For documentation related changes: [ ] Have you ensured that format looks appropriate for the output in which it is rendered? Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/markap14/nifi NIFI-4142 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/2015.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2015 commit ad5c46fe7646103021556178726254f2cbb0b8a0 Author: Mark Payne <markap14@hotmail.com> Date: 2017-06-30T12:32:01Z NIFI-4142 : Refactored Record Reader/Writer to allow for reading/writing "raw records". Implemented ValidateRecord.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user joewitt commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/2015#discussion_r127738279

          — Diff: nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SchemaValidationException.java —
          @@ -15,14 +15,16 @@

          • limitations under the License.
            */

          -package org.apache.nifi.serialization.record;
          +package org.apache.nifi.serialization;

          -public class TypeMismatchException extends RuntimeException {
          — End diff –

          i dont think it is ok to change this exeception class name at this juncture and even if it is questionable ok the juice is probably not worth the squeeze. TypeMismatch and SchemaValidation are pretty much the same thing

          Show
          githubbot ASF GitHub Bot added a comment - Github user joewitt commented on a diff in the pull request: https://github.com/apache/nifi/pull/2015#discussion_r127738279 — Diff: nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SchemaValidationException.java — @@ -15,14 +15,16 @@ limitations under the License. */ -package org.apache.nifi.serialization.record; +package org.apache.nifi.serialization; -public class TypeMismatchException extends RuntimeException { — End diff – i dont think it is ok to change this exeception class name at this juncture and even if it is questionable ok the juice is probably not worth the squeeze. TypeMismatch and SchemaValidation are pretty much the same thing
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user joewitt commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/2015#discussion_r127738787

          — Diff: nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordReader.java —
          @@ -38,14 +38,35 @@
          public interface RecordReader extends Closeable {

          /**

          • * Returns the next record in the stream or <code>null</code> if no more records are available.
            + * Returns the next record in the stream or <code>null</code> if no more records are available. Schema enforcement will be enabled.
            *
          • @return the next record in the stream or <code>null</code> if no more records are available.
            *
          • @throws IOException if unable to read from the underlying data
          • @throws MalformedRecordException if an unrecoverable failure occurs when trying to parse a record
            + * @throws SchemaValidationException if a Record contains a field that violates the schema and cannot be coerced into the appropriate field type.
            */
          • Record nextRecord() throws IOException, MalformedRecordException;
            + default Record nextRecord() throws IOException, MalformedRecordException { + return nextRecord(true); + }

            +
            + /**
            + * Reads the next record from the underlying stream. If schema enforcement is enabled, then any field in the Record whose type does not
            + * match the schema will be coerced to the correct type and a MalformedRecordException will be thrown if unable to coerce the data into
            + * the correct type. If schema enforcement is disabled, then no type coercion will occur. As a result, calling
            + *

            {@link Record#getValue(org.apache.nifi.serialization.record.RecordField)}

            + * may return any type of Object, such as a String or another Record, even though the schema indicates that the field must be an integer.
            + *
            + * @param enforceSchema whether or not fields in the Record should be validated against the schema and coerced when necessary
            + *
            + * @return the next record in the stream or <code>null</code> if no more records are available
            + * @throws IOException if unable to read from the underlying data
            + * @throws MalformedRecordException if an unrecoverable failure occurs when trying to parse a record, or a Record contains a field
            + * that violates the schema and cannot be coerced into the appropriate field type.
            + * @throws SchemaValidationException if a Record contains a field that violates the schema and cannot be coerced into the appropriate
            + * field type and schema enforcement is enabled
            + */
            + Record nextRecord(boolean enforceSchema) throws IOException, MalformedRecordException;

              • End diff –

          the schema had always been enforced arguably just with sense of leniency. I think this method parameter should be 'strictSchemaEnforcement' or 'enforceStrictSchema'.

          Show
          githubbot ASF GitHub Bot added a comment - Github user joewitt commented on a diff in the pull request: https://github.com/apache/nifi/pull/2015#discussion_r127738787 — Diff: nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordReader.java — @@ -38,14 +38,35 @@ public interface RecordReader extends Closeable { /** * Returns the next record in the stream or <code>null</code> if no more records are available. + * Returns the next record in the stream or <code>null</code> if no more records are available. Schema enforcement will be enabled. * @return the next record in the stream or <code>null</code> if no more records are available. * @throws IOException if unable to read from the underlying data @throws MalformedRecordException if an unrecoverable failure occurs when trying to parse a record + * @throws SchemaValidationException if a Record contains a field that violates the schema and cannot be coerced into the appropriate field type. */ Record nextRecord() throws IOException, MalformedRecordException; + default Record nextRecord() throws IOException, MalformedRecordException { + return nextRecord(true); + } + + /** + * Reads the next record from the underlying stream. If schema enforcement is enabled, then any field in the Record whose type does not + * match the schema will be coerced to the correct type and a MalformedRecordException will be thrown if unable to coerce the data into + * the correct type. If schema enforcement is disabled, then no type coercion will occur. As a result, calling + * {@link Record#getValue(org.apache.nifi.serialization.record.RecordField)} + * may return any type of Object, such as a String or another Record, even though the schema indicates that the field must be an integer. + * + * @param enforceSchema whether or not fields in the Record should be validated against the schema and coerced when necessary + * + * @return the next record in the stream or <code>null</code> if no more records are available + * @throws IOException if unable to read from the underlying data + * @throws MalformedRecordException if an unrecoverable failure occurs when trying to parse a record, or a Record contains a field + * that violates the schema and cannot be coerced into the appropriate field type. + * @throws SchemaValidationException if a Record contains a field that violates the schema and cannot be coerced into the appropriate + * field type and schema enforcement is enabled + */ + Record nextRecord(boolean enforceSchema) throws IOException, MalformedRecordException; End diff – the schema had always been enforced arguably just with sense of leniency. I think this method parameter should be 'strictSchemaEnforcement' or 'enforceStrictSchema'.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user joewitt commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/2015#discussion_r127738912

          — Diff: nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordReader.java —
          @@ -38,14 +38,35 @@
          public interface RecordReader extends Closeable {

          /**

          • * Returns the next record in the stream or <code>null</code> if no more records are available.
            + * Returns the next record in the stream or <code>null</code> if no more records are available. Schema enforcement will be enabled.
            *
          • @return the next record in the stream or <code>null</code> if no more records are available.
            *
          • @throws IOException if unable to read from the underlying data
          • @throws MalformedRecordException if an unrecoverable failure occurs when trying to parse a record
            + * @throws SchemaValidationException if a Record contains a field that violates the schema and cannot be coerced into the appropriate field type.
            */
          • Record nextRecord() throws IOException, MalformedRecordException;
            + default Record nextRecord() throws IOException, MalformedRecordException {
              • End diff –

          we should indicate whether the scheme enforcement strictness is 'lenient' or 'strict'.

          Show
          githubbot ASF GitHub Bot added a comment - Github user joewitt commented on a diff in the pull request: https://github.com/apache/nifi/pull/2015#discussion_r127738912 — Diff: nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordReader.java — @@ -38,14 +38,35 @@ public interface RecordReader extends Closeable { /** * Returns the next record in the stream or <code>null</code> if no more records are available. + * Returns the next record in the stream or <code>null</code> if no more records are available. Schema enforcement will be enabled. * @return the next record in the stream or <code>null</code> if no more records are available. * @throws IOException if unable to read from the underlying data @throws MalformedRecordException if an unrecoverable failure occurs when trying to parse a record + * @throws SchemaValidationException if a Record contains a field that violates the schema and cannot be coerced into the appropriate field type. */ Record nextRecord() throws IOException, MalformedRecordException; + default Record nextRecord() throws IOException, MalformedRecordException { End diff – we should indicate whether the scheme enforcement strictness is 'lenient' or 'strict'.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user markap14 commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/2015#discussion_r128764060

          — Diff: nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SchemaValidationException.java —
          @@ -15,14 +15,16 @@

          • limitations under the License.
            */

          -package org.apache.nifi.serialization.record;
          +package org.apache.nifi.serialization;

          -public class TypeMismatchException extends RuntimeException {
          — End diff –

          I don't agree that they are pretty much the same thing. TypeMismatchException is very specific. SchemaValidationException can be much more broad. For instance, if a required field is missing, that is not a Type Mismatch, but it is a Schema Validation.

          Show
          githubbot ASF GitHub Bot added a comment - Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2015#discussion_r128764060 — Diff: nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SchemaValidationException.java — @@ -15,14 +15,16 @@ limitations under the License. */ -package org.apache.nifi.serialization.record; +package org.apache.nifi.serialization; -public class TypeMismatchException extends RuntimeException { — End diff – I don't agree that they are pretty much the same thing. TypeMismatchException is very specific. SchemaValidationException can be much more broad. For instance, if a required field is missing, that is not a Type Mismatch, but it is a Schema Validation.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user markap14 commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/2015#discussion_r128806560

          — Diff: nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordReader.java —
          @@ -38,14 +38,35 @@
          public interface RecordReader extends Closeable {

          /**

          • * Returns the next record in the stream or <code>null</code> if no more records are available.
            + * Returns the next record in the stream or <code>null</code> if no more records are available. Schema enforcement will be enabled.
            *
          • @return the next record in the stream or <code>null</code> if no more records are available.
            *
          • @throws IOException if unable to read from the underlying data
          • @throws MalformedRecordException if an unrecoverable failure occurs when trying to parse a record
            + * @throws SchemaValidationException if a Record contains a field that violates the schema and cannot be coerced into the appropriate field type.
            */
          • Record nextRecord() throws IOException, MalformedRecordException;
            + default Record nextRecord() throws IOException, MalformedRecordException { + return nextRecord(true); + }

            +
            + /**
            + * Reads the next record from the underlying stream. If schema enforcement is enabled, then any field in the Record whose type does not
            + * match the schema will be coerced to the correct type and a MalformedRecordException will be thrown if unable to coerce the data into
            + * the correct type. If schema enforcement is disabled, then no type coercion will occur. As a result, calling
            + *

            {@link Record#getValue(org.apache.nifi.serialization.record.RecordField)}

            + * may return any type of Object, such as a String or another Record, even though the schema indicates that the field must be an integer.
            + *
            + * @param enforceSchema whether or not fields in the Record should be validated against the schema and coerced when necessary
            + *
            + * @return the next record in the stream or <code>null</code> if no more records are available
            + * @throws IOException if unable to read from the underlying data
            + * @throws MalformedRecordException if an unrecoverable failure occurs when trying to parse a record, or a Record contains a field
            + * that violates the schema and cannot be coerced into the appropriate field type.
            + * @throws SchemaValidationException if a Record contains a field that violates the schema and cannot be coerced into the appropriate
            + * field type and schema enforcement is enabled
            + */
            + Record nextRecord(boolean enforceSchema) throws IOException, MalformedRecordException;

              • End diff –

          I think I actually want to just separate the concept out into two different variables here: boolean coerceTypes, boolean dropUnknownRecords. That way it is very explicit what is happening, and I don't think that 'strict' vs. 'lenient' really conveys those two semantics as well as I'd like.

          Show
          githubbot ASF GitHub Bot added a comment - Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2015#discussion_r128806560 — Diff: nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordReader.java — @@ -38,14 +38,35 @@ public interface RecordReader extends Closeable { /** * Returns the next record in the stream or <code>null</code> if no more records are available. + * Returns the next record in the stream or <code>null</code> if no more records are available. Schema enforcement will be enabled. * @return the next record in the stream or <code>null</code> if no more records are available. * @throws IOException if unable to read from the underlying data @throws MalformedRecordException if an unrecoverable failure occurs when trying to parse a record + * @throws SchemaValidationException if a Record contains a field that violates the schema and cannot be coerced into the appropriate field type. */ Record nextRecord() throws IOException, MalformedRecordException; + default Record nextRecord() throws IOException, MalformedRecordException { + return nextRecord(true); + } + + /** + * Reads the next record from the underlying stream. If schema enforcement is enabled, then any field in the Record whose type does not + * match the schema will be coerced to the correct type and a MalformedRecordException will be thrown if unable to coerce the data into + * the correct type. If schema enforcement is disabled, then no type coercion will occur. As a result, calling + * {@link Record#getValue(org.apache.nifi.serialization.record.RecordField)} + * may return any type of Object, such as a String or another Record, even though the schema indicates that the field must be an integer. + * + * @param enforceSchema whether or not fields in the Record should be validated against the schema and coerced when necessary + * + * @return the next record in the stream or <code>null</code> if no more records are available + * @throws IOException if unable to read from the underlying data + * @throws MalformedRecordException if an unrecoverable failure occurs when trying to parse a record, or a Record contains a field + * that violates the schema and cannot be coerced into the appropriate field type. + * @throws SchemaValidationException if a Record contains a field that violates the schema and cannot be coerced into the appropriate + * field type and schema enforcement is enabled + */ + Record nextRecord(boolean enforceSchema) throws IOException, MalformedRecordException; End diff – I think I actually want to just separate the concept out into two different variables here: boolean coerceTypes, boolean dropUnknownRecords. That way it is very explicit what is happening, and I don't think that 'strict' vs. 'lenient' really conveys those two semantics as well as I'd like.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user markap14 commented on the issue:

          https://github.com/apache/nifi/pull/2015

          @joewitt I've pushed a new commit that I believe better clarifies how schemas are treated in terms of strictness vs. leniency by providing two arguments instead of 'enforceSchema': 'coerceTypes' and 'dropUnknownFields'

          Show
          githubbot ASF GitHub Bot added a comment - Github user markap14 commented on the issue: https://github.com/apache/nifi/pull/2015 @joewitt I've pushed a new commit that I believe better clarifies how schemas are treated in terms of strictness vs. leniency by providing two arguments instead of 'enforceSchema': 'coerceTypes' and 'dropUnknownFields'
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user joewitt commented on the issue:

          https://github.com/apache/nifi/pull/2015

          @markap14 can you please rebase and resolve conflict and please go ahead and squash.

          Show
          githubbot ASF GitHub Bot added a comment - Github user joewitt commented on the issue: https://github.com/apache/nifi/pull/2015 @markap14 can you please rebase and resolve conflict and please go ahead and squash.
          Hide
          markap14 Mark Payne added a comment -

          Joseph Witt rebased and squashed. Thanks!

          Show
          markap14 Mark Payne added a comment - Joseph Witt rebased and squashed. Thanks!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user joewitt commented on the issue:

          https://github.com/apache/nifi/pull/2015

          @markap14 looks like the option says "Used Reader's Schema". I'm assuming that is a typo and it should say "Use Reader's Schema". But that is not in this PR it appears. Did that sneak in elsewhere?

          Show
          githubbot ASF GitHub Bot added a comment - Github user joewitt commented on the issue: https://github.com/apache/nifi/pull/2015 @markap14 looks like the option says "Used Reader's Schema". I'm assuming that is a typo and it should say "Use Reader's Schema". But that is not in this PR it appears. Did that sneak in elsewhere?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user joewitt commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/2015#discussion_r132786485

          — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java —
          @@ -0,0 +1,457 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.nifi.processors.standard;
          +
          +
          +import java.io.IOException;
          +import java.io.InputStream;
          +import java.io.OutputStream;
          +import java.util.ArrayList;
          +import java.util.Collection;
          +import java.util.Collections;
          +import java.util.HashMap;
          +import java.util.HashSet;
          +import java.util.List;
          +import java.util.Map;
          +import java.util.Optional;
          +import java.util.Set;
          +
          +import org.apache.avro.Schema;
          +import org.apache.avro.Schema.Parser;
          +import org.apache.nifi.annotation.behavior.EventDriven;
          +import org.apache.nifi.annotation.behavior.InputRequirement;
          +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
          +import org.apache.nifi.annotation.behavior.SideEffectFree;
          +import org.apache.nifi.annotation.behavior.SupportsBatching;
          +import org.apache.nifi.annotation.documentation.CapabilityDescription;
          +import org.apache.nifi.annotation.documentation.Tags;
          +import org.apache.nifi.avro.AvroSchemaValidator;
          +import org.apache.nifi.avro.AvroTypeUtil;
          +import org.apache.nifi.components.AllowableValue;
          +import org.apache.nifi.components.PropertyDescriptor;
          +import org.apache.nifi.components.ValidationContext;
          +import org.apache.nifi.components.ValidationResult;
          +import org.apache.nifi.flowfile.FlowFile;
          +import org.apache.nifi.flowfile.attributes.CoreAttributes;
          +import org.apache.nifi.processor.AbstractProcessor;
          +import org.apache.nifi.processor.ProcessContext;
          +import org.apache.nifi.processor.ProcessSession;
          +import org.apache.nifi.processor.Relationship;
          +import org.apache.nifi.processor.exception.ProcessException;
          +import org.apache.nifi.processor.util.StandardValidators;
          +import org.apache.nifi.schema.access.SchemaField;
          +import org.apache.nifi.schema.access.SchemaNotFoundException;
          +import org.apache.nifi.schema.validation.SchemaValidationContext;
          +import org.apache.nifi.schema.validation.StandardSchemaValidator;
          +import org.apache.nifi.schemaregistry.services.SchemaRegistry;
          +import org.apache.nifi.serialization.MalformedRecordException;
          +import org.apache.nifi.serialization.RecordReader;
          +import org.apache.nifi.serialization.RecordReaderFactory;
          +import org.apache.nifi.serialization.RecordSetWriter;
          +import org.apache.nifi.serialization.RecordSetWriterFactory;
          +import org.apache.nifi.serialization.WriteResult;
          +import org.apache.nifi.serialization.record.RawRecordWriter;
          +import org.apache.nifi.serialization.record.Record;
          +import org.apache.nifi.serialization.record.RecordSchema;
          +import org.apache.nifi.serialization.record.validation.RecordSchemaValidator;
          +import org.apache.nifi.serialization.record.validation.SchemaValidationResult;
          +import org.apache.nifi.serialization.record.validation.ValidationError;
          +
          +@EventDriven
          +@SideEffectFree
          +@SupportsBatching
          +@InputRequirement(Requirement.INPUT_REQUIRED)
          +@Tags(

          {"record", "schema", "validate"}

          )
          +@CapabilityDescription("Validates the Records of an incoming FlowFile against a given schema. All records that adhere to the schema are routed to the \"valid\" relationship while "
          + + "records that do not adhere to hte schema are routed to the \"invalid\" relationship. It is therefore possible for a single incoming FlowFile to be split into two individual "
          + + "FlowFiles if some records are valid according to the schema and others are not. Any FlowFile that is routed to the \"invalid\" relationship will emit a ROUTE Provenance Event "
          + + "with the Details field populated to explain why records were invalid. In addition, to gain further explanation of why records were invalid, DEBUG-level logging can be enabled "
          + + "for the \"org.apache.nifi.processors.standard.ValidateRecord\" logger.")
          +public class ValidateRecord extends AbstractProcessor {
          +
          + static final AllowableValue SCHEMA_NAME_PROPERTY = new AllowableValue("schema-name-property", "Use Schema Name Property",
          + "The schema to validate the data against is determined by looking at the 'Schema Name' Property and looking up the schema in the configured Schema Registry");
          + static final AllowableValue SCHEMA_TEXT_PROPERTY = new AllowableValue("schema-text-property", "Use Schema Text Property",
          + "The schema to validate the data against is determined by looking at the 'Schema Text' Property and parsing the schema as an Avro schema");
          + static final AllowableValue READER_SCHEMA = new AllowableValue("reader-schema", "Used Reader's Schema",
          — End diff –

          Ah here it is. "Used Reader's Schema" should be "Use Reader Schema".

          Show
          githubbot ASF GitHub Bot added a comment - Github user joewitt commented on a diff in the pull request: https://github.com/apache/nifi/pull/2015#discussion_r132786485 — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java — @@ -0,0 +1,457 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import org.apache.avro.Schema; +import org.apache.avro.Schema.Parser; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.avro.AvroSchemaValidator; +import org.apache.nifi.avro.AvroTypeUtil; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaField; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.schema.validation.SchemaValidationContext; +import org.apache.nifi.schema.validation.StandardSchemaValidator; +import org.apache.nifi.schemaregistry.services.SchemaRegistry; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.RawRecordWriter; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.validation.RecordSchemaValidator; +import org.apache.nifi.serialization.record.validation.SchemaValidationResult; +import org.apache.nifi.serialization.record.validation.ValidationError; + +@EventDriven +@SideEffectFree +@SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags( {"record", "schema", "validate"} ) +@CapabilityDescription("Validates the Records of an incoming FlowFile against a given schema. All records that adhere to the schema are routed to the \"valid\" relationship while " + + "records that do not adhere to hte schema are routed to the \"invalid\" relationship. It is therefore possible for a single incoming FlowFile to be split into two individual " + + "FlowFiles if some records are valid according to the schema and others are not. Any FlowFile that is routed to the \"invalid\" relationship will emit a ROUTE Provenance Event " + + "with the Details field populated to explain why records were invalid. In addition, to gain further explanation of why records were invalid, DEBUG-level logging can be enabled " + + "for the \"org.apache.nifi.processors.standard.ValidateRecord\" logger.") +public class ValidateRecord extends AbstractProcessor { + + static final AllowableValue SCHEMA_NAME_PROPERTY = new AllowableValue("schema-name-property", "Use Schema Name Property", + "The schema to validate the data against is determined by looking at the 'Schema Name' Property and looking up the schema in the configured Schema Registry"); + static final AllowableValue SCHEMA_TEXT_PROPERTY = new AllowableValue("schema-text-property", "Use Schema Text Property", + "The schema to validate the data against is determined by looking at the 'Schema Text' Property and parsing the schema as an Avro schema"); + static final AllowableValue READER_SCHEMA = new AllowableValue("reader-schema", "Used Reader's Schema", — End diff – Ah here it is. "Used Reader's Schema" should be "Use Reader Schema".
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user joewitt commented on the issue:

          https://github.com/apache/nifi/pull/2015

          went ahead and fixed the typo and squashed locally.

          Ran a test flow that created csv of various conformity to a simple spec. Ran it through various configratuions of the validator and it behaved as desired and expected in each case. When data was marked invalid was able to see precisely why through the provenance events which was great. Was able to see coercion/non-coercion behavior and all checks out.

          +1 merging to master

          Show
          githubbot ASF GitHub Bot added a comment - Github user joewitt commented on the issue: https://github.com/apache/nifi/pull/2015 went ahead and fixed the typo and squashed locally. Ran a test flow that created csv of various conformity to a simple spec. Ran it through various configratuions of the validator and it behaved as desired and expected in each case. When data was marked invalid was able to see precisely why through the provenance events which was great. Was able to see coercion/non-coercion behavior and all checks out. +1 merging to master
          Hide
          jira-bot ASF subversion and git services added a comment -

          Commit 451f9cf12407c4f7abeb8e538e3f55ecbf40abab in nifi's branch refs/heads/master from Mark Payne
          [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=451f9cf ]

          NIFI-4142: This closes #2015. Refactored Record Reader/Writer to allow for reading/writing "raw records". Implemented ValidateRecord. Updated Record Reader to take two parameters for nextRecord: (boolean coerceTypes) and (boolean dropUnknownFields)

          Signed-off-by: joewitt <joewitt@apache.org>

          Show
          jira-bot ASF subversion and git services added a comment - Commit 451f9cf12407c4f7abeb8e538e3f55ecbf40abab in nifi's branch refs/heads/master from Mark Payne [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=451f9cf ] NIFI-4142 : This closes #2015. Refactored Record Reader/Writer to allow for reading/writing "raw records". Implemented ValidateRecord. Updated Record Reader to take two parameters for nextRecord: (boolean coerceTypes) and (boolean dropUnknownFields) Signed-off-by: joewitt <joewitt@apache.org>
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/nifi/pull/2015

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/nifi/pull/2015

            People

            • Assignee:
              markap14 Mark Payne
              Reporter:
              markap14 Mark Payne
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development