Details

    • Type: New Feature
    • Status: Reopened
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None

      Description

      NiFi has a ValidateXml processor to validate incoming XML files against a schema. It would be good to have one to validate JSON files as well.

      For example, an input JSON of:

      {
      name: "Test",
      timestamp: 1463499695,
      tags:

      { "host": "Test_1", "ip" : "1.1.1.1" }

      ,
      fields:

      { "cpu": 10.2, "load": 15.6 }

      }

      Could be validated successfully against the following "schema":

      {
      "type": "object",
      "required": ["name", "tags", "timestamp", "fields"],
      "properties": {
      "name":

      {"type": "string"}

      ,
      "timestamp":

      {"type": "integer"}

      ,
      "tags": {"type": "object", "items": {"type": "string"}},
      "fields":

      { "type": "object"}

      }
      }

      There is at least one ASF-friendly library that could be used for implementation: https://github.com/everit-org/json-schema

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user bartoszjkwozniak opened a pull request:

          https://github.com/apache/nifi/pull/1037

          NIFI-1893 Add processor for validating JSON

          Signed-off-by: Bartosz Wozniak <bartosz.jk.wozniak@gmail.com>

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/bartoszjkwozniak/nifi NIFI-1893

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/nifi/pull/1037.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #1037


          commit 554d15fa994b74db67e2906de31a8fe001045d95
          Author: Bartosz Wozniak <bartosz.jk.wozniak@gmail.com>
          Date: 2016-09-19T11:37:54Z

          NIFI-1893 Add processor for validating JSON

          Signed-off-by: Bartosz Wozniak <bartosz.jk.wozniak@gmail.com>


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user bartoszjkwozniak opened a pull request: https://github.com/apache/nifi/pull/1037 NIFI-1893 Add processor for validating JSON Signed-off-by: Bartosz Wozniak <bartosz.jk.wozniak@gmail.com> You can merge this pull request into a Git repository by running: $ git pull https://github.com/bartoszjkwozniak/nifi NIFI-1893 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/1037.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1037 commit 554d15fa994b74db67e2906de31a8fe001045d95 Author: Bartosz Wozniak <bartosz.jk.wozniak@gmail.com> Date: 2016-09-19T11:37:54Z NIFI-1893 Add processor for validating JSON Signed-off-by: Bartosz Wozniak <bartosz.jk.wozniak@gmail.com>
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user pvillard31 commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1037#discussion_r79655327

          — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateJson.java —
          @@ -0,0 +1,159 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.processors.standard;
          +
          +import java.io.File;
          +import java.io.FileInputStream;
          +import java.io.IOException;
          +import java.io.InputStream;
          +import java.nio.charset.StandardCharsets;
          +import java.util.ArrayList;
          +import java.util.Collections;
          +import java.util.HashSet;
          +import java.util.List;
          +import java.util.Set;
          +import java.util.concurrent.atomic.AtomicBoolean;
          +import java.util.concurrent.atomic.AtomicReference;
          +
          +import org.apache.commons.io.IOUtils;
          +import org.apache.nifi.annotation.behavior.EventDriven;
          +import org.apache.nifi.annotation.behavior.InputRequirement;
          +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
          +import org.apache.nifi.annotation.behavior.SideEffectFree;
          +import org.apache.nifi.annotation.behavior.SupportsBatching;
          +import org.apache.nifi.annotation.documentation.CapabilityDescription;
          +import org.apache.nifi.annotation.documentation.Tags;
          +import org.apache.nifi.annotation.lifecycle.OnScheduled;
          +import org.apache.nifi.components.PropertyDescriptor;
          +import org.apache.nifi.flowfile.FlowFile;
          +import org.apache.nifi.logging.ComponentLog;
          +import org.apache.nifi.processor.AbstractProcessor;
          +import org.apache.nifi.processor.ProcessContext;
          +import org.apache.nifi.processor.ProcessSession;
          +import org.apache.nifi.processor.ProcessorInitializationContext;
          +import org.apache.nifi.processor.Relationship;
          +import org.apache.nifi.processor.io.InputStreamCallback;
          +import org.apache.nifi.processor.util.StandardValidators;
          +
          +import org.everit.json.schema.Schema;
          +import org.everit.json.schema.ValidationException;
          +import org.everit.json.schema.loader.SchemaLoader;
          +import org.json.JSONArray;
          +import org.json.JSONObject;
          +import org.json.JSONTokener;
          +
          +
          +@EventDriven
          +@SideEffectFree
          +@SupportsBatching
          +@InputRequirement(Requirement.INPUT_REQUIRED)
          +@Tags(

          {"json", "schema", "validation"}

          )
          +@CapabilityDescription("Validates the contents of FlowFiles against a user-specified JSON Schema file")
          +public class ValidateJson extends AbstractProcessor {
          +
          + public static final PropertyDescriptor SCHEMA_FILE = new PropertyDescriptor.Builder()
          + .name("Schema File")
          + .description("The path to the Schema file that is to be used for validation")
          + .required(true)
          + .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
          + .build();
          +
          + public static final Relationship REL_VALID = new Relationship.Builder()
          + .name("valid")
          + .description("FlowFiles that are successfully validated against the schema are routed to this relationship")
          + .build();
          + public static final Relationship REL_INVALID = new Relationship.Builder()
          + .name("invalid")
          + .description("FlowFiles that are not valid according to the specified schema are routed to this relationship")
          + .build();
          +
          + private List<PropertyDescriptor> properties;
          + private Set<Relationship> relationships;
          + private final AtomicReference<Schema> schemaRef = new AtomicReference<>();
          +
          + @Override
          + protected void init(final ProcessorInitializationContext context)

          { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(SCHEMA_FILE); + this.properties = Collections.unmodifiableList(properties); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_VALID); + relationships.add(REL_INVALID); + this.relationships = Collections.unmodifiableSet(relationships); + }

          +
          + @Override
          + public Set<Relationship> getRelationships()

          { + return relationships; + }

          +
          + @Override
          + protected List<PropertyDescriptor> getSupportedPropertyDescriptors()

          { + return properties; + }

          +
          + @OnScheduled
          + public void parseSchema(final ProcessContext context) throws IOException {
          + try(FileInputStream inputStream = new FileInputStream(new File(context.getProperty(SCHEMA_FILE).getValue())))

          { + JSONTokener jsonTokener = new JSONTokener(inputStream); + JSONObject jsonObject = new JSONObject(jsonTokener); + Schema schema = SchemaLoader.load(jsonObject); + this.schemaRef.set(schema); + }

          + }
          +
          + @Override
          + public void onTrigger(final ProcessContext context, final ProcessSession session) {
          + final List<FlowFile> flowFiles = session.get(50);
          — End diff –

          I would suggest not doing an internal batching here. Some older processors did this because "@SupportsBatching" wasn't created yet. You already have this annotation so this will allow the user to set the batching duration instead of hiding inside the implementation.

          Show
          githubbot ASF GitHub Bot added a comment - Github user pvillard31 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1037#discussion_r79655327 — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateJson.java — @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.commons.io.IOUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; + +import org.everit.json.schema.Schema; +import org.everit.json.schema.ValidationException; +import org.everit.json.schema.loader.SchemaLoader; +import org.json.JSONArray; +import org.json.JSONObject; +import org.json.JSONTokener; + + +@EventDriven +@SideEffectFree +@SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags( {"json", "schema", "validation"} ) +@CapabilityDescription("Validates the contents of FlowFiles against a user-specified JSON Schema file") +public class ValidateJson extends AbstractProcessor { + + public static final PropertyDescriptor SCHEMA_FILE = new PropertyDescriptor.Builder() + .name("Schema File") + .description("The path to the Schema file that is to be used for validation") + .required(true) + .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) + .build(); + + public static final Relationship REL_VALID = new Relationship.Builder() + .name("valid") + .description("FlowFiles that are successfully validated against the schema are routed to this relationship") + .build(); + public static final Relationship REL_INVALID = new Relationship.Builder() + .name("invalid") + .description("FlowFiles that are not valid according to the specified schema are routed to this relationship") + .build(); + + private List<PropertyDescriptor> properties; + private Set<Relationship> relationships; + private final AtomicReference<Schema> schemaRef = new AtomicReference<>(); + + @Override + protected void init(final ProcessorInitializationContext context) { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(SCHEMA_FILE); + this.properties = Collections.unmodifiableList(properties); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_VALID); + relationships.add(REL_INVALID); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @OnScheduled + public void parseSchema(final ProcessContext context) throws IOException { + try(FileInputStream inputStream = new FileInputStream(new File(context.getProperty(SCHEMA_FILE).getValue()))) { + JSONTokener jsonTokener = new JSONTokener(inputStream); + JSONObject jsonObject = new JSONObject(jsonTokener); + Schema schema = SchemaLoader.load(jsonObject); + this.schemaRef.set(schema); + } + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + final List<FlowFile> flowFiles = session.get(50); — End diff – I would suggest not doing an internal batching here. Some older processors did this because "@SupportsBatching" wasn't created yet. You already have this annotation so this will allow the user to set the batching duration instead of hiding inside the implementation.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user pvillard31 commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1037#discussion_r79654106

          — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateJson.java —
          @@ -0,0 +1,159 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.processors.standard;
          +
          +import java.io.File;
          +import java.io.FileInputStream;
          +import java.io.IOException;
          +import java.io.InputStream;
          +import java.nio.charset.StandardCharsets;
          +import java.util.ArrayList;
          +import java.util.Collections;
          +import java.util.HashSet;
          +import java.util.List;
          +import java.util.Set;
          +import java.util.concurrent.atomic.AtomicBoolean;
          +import java.util.concurrent.atomic.AtomicReference;
          +
          +import org.apache.commons.io.IOUtils;
          +import org.apache.nifi.annotation.behavior.EventDriven;
          +import org.apache.nifi.annotation.behavior.InputRequirement;
          +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
          +import org.apache.nifi.annotation.behavior.SideEffectFree;
          +import org.apache.nifi.annotation.behavior.SupportsBatching;
          +import org.apache.nifi.annotation.documentation.CapabilityDescription;
          +import org.apache.nifi.annotation.documentation.Tags;
          +import org.apache.nifi.annotation.lifecycle.OnScheduled;
          +import org.apache.nifi.components.PropertyDescriptor;
          +import org.apache.nifi.flowfile.FlowFile;
          +import org.apache.nifi.logging.ComponentLog;
          +import org.apache.nifi.processor.AbstractProcessor;
          +import org.apache.nifi.processor.ProcessContext;
          +import org.apache.nifi.processor.ProcessSession;
          +import org.apache.nifi.processor.ProcessorInitializationContext;
          +import org.apache.nifi.processor.Relationship;
          +import org.apache.nifi.processor.io.InputStreamCallback;
          +import org.apache.nifi.processor.util.StandardValidators;
          +
          +import org.everit.json.schema.Schema;
          +import org.everit.json.schema.ValidationException;
          +import org.everit.json.schema.loader.SchemaLoader;
          +import org.json.JSONArray;
          +import org.json.JSONObject;
          +import org.json.JSONTokener;
          +
          +
          +@EventDriven
          +@SideEffectFree
          +@SupportsBatching
          +@InputRequirement(Requirement.INPUT_REQUIRED)
          +@Tags(

          {"json", "schema", "validation"}

          )
          +@CapabilityDescription("Validates the contents of FlowFiles against a user-specified JSON Schema file")
          +public class ValidateJson extends AbstractProcessor {
          +
          + public static final PropertyDescriptor SCHEMA_FILE = new PropertyDescriptor.Builder()
          + .name("Schema File")
          — End diff –

          Could you use both ``.name()`` and ``.displayName()`` as explained here:
          https://mail-archives.apache.org/mod_mbox/nifi-dev/201605.mbox/%3C5A6FDF1E-1889-46FE-A3C4-5D2F0A905979@apache.org%3E

          Show
          githubbot ASF GitHub Bot added a comment - Github user pvillard31 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1037#discussion_r79654106 — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateJson.java — @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.commons.io.IOUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; + +import org.everit.json.schema.Schema; +import org.everit.json.schema.ValidationException; +import org.everit.json.schema.loader.SchemaLoader; +import org.json.JSONArray; +import org.json.JSONObject; +import org.json.JSONTokener; + + +@EventDriven +@SideEffectFree +@SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags( {"json", "schema", "validation"} ) +@CapabilityDescription("Validates the contents of FlowFiles against a user-specified JSON Schema file") +public class ValidateJson extends AbstractProcessor { + + public static final PropertyDescriptor SCHEMA_FILE = new PropertyDescriptor.Builder() + .name("Schema File") — End diff – Could you use both ``.name()`` and ``.displayName()`` as explained here: https://mail-archives.apache.org/mod_mbox/nifi-dev/201605.mbox/%3C5A6FDF1E-1889-46FE-A3C4-5D2F0A905979@apache.org%3E
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user pvillard31 commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1037#discussion_r79655764

          — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateJson.java —
          @@ -0,0 +1,159 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.processors.standard;
          +
          +import java.io.File;
          +import java.io.FileInputStream;
          +import java.io.IOException;
          +import java.io.InputStream;
          +import java.nio.charset.StandardCharsets;
          +import java.util.ArrayList;
          +import java.util.Collections;
          +import java.util.HashSet;
          +import java.util.List;
          +import java.util.Set;
          +import java.util.concurrent.atomic.AtomicBoolean;
          +import java.util.concurrent.atomic.AtomicReference;
          +
          +import org.apache.commons.io.IOUtils;
          +import org.apache.nifi.annotation.behavior.EventDriven;
          +import org.apache.nifi.annotation.behavior.InputRequirement;
          +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
          +import org.apache.nifi.annotation.behavior.SideEffectFree;
          +import org.apache.nifi.annotation.behavior.SupportsBatching;
          +import org.apache.nifi.annotation.documentation.CapabilityDescription;
          +import org.apache.nifi.annotation.documentation.Tags;
          +import org.apache.nifi.annotation.lifecycle.OnScheduled;
          +import org.apache.nifi.components.PropertyDescriptor;
          +import org.apache.nifi.flowfile.FlowFile;
          +import org.apache.nifi.logging.ComponentLog;
          +import org.apache.nifi.processor.AbstractProcessor;
          +import org.apache.nifi.processor.ProcessContext;
          +import org.apache.nifi.processor.ProcessSession;
          +import org.apache.nifi.processor.ProcessorInitializationContext;
          +import org.apache.nifi.processor.Relationship;
          +import org.apache.nifi.processor.io.InputStreamCallback;
          +import org.apache.nifi.processor.util.StandardValidators;
          +
          +import org.everit.json.schema.Schema;
          +import org.everit.json.schema.ValidationException;
          +import org.everit.json.schema.loader.SchemaLoader;
          +import org.json.JSONArray;
          +import org.json.JSONObject;
          +import org.json.JSONTokener;
          +
          +
          +@EventDriven
          +@SideEffectFree
          +@SupportsBatching
          +@InputRequirement(Requirement.INPUT_REQUIRED)
          +@Tags(

          {"json", "schema", "validation"}

          )
          +@CapabilityDescription("Validates the contents of FlowFiles against a user-specified JSON Schema file")
          +public class ValidateJson extends AbstractProcessor {
          +
          + public static final PropertyDescriptor SCHEMA_FILE = new PropertyDescriptor.Builder()
          + .name("Schema File")
          + .description("The path to the Schema file that is to be used for validation")
          + .required(true)
          + .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
          + .build();
          +
          + public static final Relationship REL_VALID = new Relationship.Builder()
          + .name("valid")
          + .description("FlowFiles that are successfully validated against the schema are routed to this relationship")
          + .build();
          + public static final Relationship REL_INVALID = new Relationship.Builder()
          + .name("invalid")
          + .description("FlowFiles that are not valid according to the specified schema are routed to this relationship")
          + .build();
          +
          + private List<PropertyDescriptor> properties;
          + private Set<Relationship> relationships;
          + private final AtomicReference<Schema> schemaRef = new AtomicReference<>();
          +
          + @Override
          + protected void init(final ProcessorInitializationContext context)

          { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(SCHEMA_FILE); + this.properties = Collections.unmodifiableList(properties); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_VALID); + relationships.add(REL_INVALID); + this.relationships = Collections.unmodifiableSet(relationships); + }

          +
          + @Override
          + public Set<Relationship> getRelationships()

          { + return relationships; + }

          +
          + @Override
          + protected List<PropertyDescriptor> getSupportedPropertyDescriptors()

          { + return properties; + }

          +
          + @OnScheduled
          + public void parseSchema(final ProcessContext context) throws IOException {
          + try(FileInputStream inputStream = new FileInputStream(new File(context.getProperty(SCHEMA_FILE).getValue())))

          { + JSONTokener jsonTokener = new JSONTokener(inputStream); + JSONObject jsonObject = new JSONObject(jsonTokener); + Schema schema = SchemaLoader.load(jsonObject); + this.schemaRef.set(schema); + }

          + }
          +
          + @Override
          + public void onTrigger(final ProcessContext context, final ProcessSession session) {
          + final List<FlowFile> flowFiles = session.get(50);
          + if (flowFiles.isEmpty())

          { + return; + }

          + final Schema schema = schemaRef.get();
          + final ComponentLog logger = getLogger();
          +
          + for (final FlowFile flowFile : flowFiles) {
          + final AtomicBoolean valid = new AtomicBoolean(true);
          + session.read(flowFile, new InputStreamCallback() {
          + @Override
          + public void process(final InputStream in) throws IOException {
          + try {
          + String str = IOUtils.toString(in, StandardCharsets.UTF_8);
          + if (str.startsWith("["))

          { + schema.validate(new JSONArray(str)); // throws a ValidationException if this object is invalid + }

          else

          { + schema.validate(new JSONObject(str)); // throws a ValidationException if this object is invalid + }

          + } catch (final IllegalArgumentException | ValidationException e) {
          + valid.set(false);
          + logger.debug("Failed to validate {} against schema due to {}", new Object[]

          {flowFile, e}

          );
          + }
          + }
          + });
          +
          + if (valid.get()) {
          + logger.info("Successfully validated {} against schema; routing to 'valid'", new Object[]

          {flowFile}

          );
          — End diff –

          I'd recommend debug level otherwise it'll become very verbose.

          Show
          githubbot ASF GitHub Bot added a comment - Github user pvillard31 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1037#discussion_r79655764 — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateJson.java — @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.commons.io.IOUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; + +import org.everit.json.schema.Schema; +import org.everit.json.schema.ValidationException; +import org.everit.json.schema.loader.SchemaLoader; +import org.json.JSONArray; +import org.json.JSONObject; +import org.json.JSONTokener; + + +@EventDriven +@SideEffectFree +@SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags( {"json", "schema", "validation"} ) +@CapabilityDescription("Validates the contents of FlowFiles against a user-specified JSON Schema file") +public class ValidateJson extends AbstractProcessor { + + public static final PropertyDescriptor SCHEMA_FILE = new PropertyDescriptor.Builder() + .name("Schema File") + .description("The path to the Schema file that is to be used for validation") + .required(true) + .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) + .build(); + + public static final Relationship REL_VALID = new Relationship.Builder() + .name("valid") + .description("FlowFiles that are successfully validated against the schema are routed to this relationship") + .build(); + public static final Relationship REL_INVALID = new Relationship.Builder() + .name("invalid") + .description("FlowFiles that are not valid according to the specified schema are routed to this relationship") + .build(); + + private List<PropertyDescriptor> properties; + private Set<Relationship> relationships; + private final AtomicReference<Schema> schemaRef = new AtomicReference<>(); + + @Override + protected void init(final ProcessorInitializationContext context) { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(SCHEMA_FILE); + this.properties = Collections.unmodifiableList(properties); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_VALID); + relationships.add(REL_INVALID); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @OnScheduled + public void parseSchema(final ProcessContext context) throws IOException { + try(FileInputStream inputStream = new FileInputStream(new File(context.getProperty(SCHEMA_FILE).getValue()))) { + JSONTokener jsonTokener = new JSONTokener(inputStream); + JSONObject jsonObject = new JSONObject(jsonTokener); + Schema schema = SchemaLoader.load(jsonObject); + this.schemaRef.set(schema); + } + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + final List<FlowFile> flowFiles = session.get(50); + if (flowFiles.isEmpty()) { + return; + } + final Schema schema = schemaRef.get(); + final ComponentLog logger = getLogger(); + + for (final FlowFile flowFile : flowFiles) { + final AtomicBoolean valid = new AtomicBoolean(true); + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + try { + String str = IOUtils.toString(in, StandardCharsets.UTF_8); + if (str.startsWith("[")) { + schema.validate(new JSONArray(str)); // throws a ValidationException if this object is invalid + } else { + schema.validate(new JSONObject(str)); // throws a ValidationException if this object is invalid + } + } catch (final IllegalArgumentException | ValidationException e) { + valid.set(false); + logger.debug("Failed to validate {} against schema due to {}", new Object[] {flowFile, e} ); + } + } + }); + + if (valid.get()) { + logger.info("Successfully validated {} against schema; routing to 'valid'", new Object[] {flowFile} ); — End diff – I'd recommend debug level otherwise it'll become very verbose.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user pvillard31 commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1037#discussion_r79654708

          — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateJson.java —
          @@ -0,0 +1,159 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.processors.standard;
          +
          +import java.io.File;
          +import java.io.FileInputStream;
          +import java.io.IOException;
          +import java.io.InputStream;
          +import java.nio.charset.StandardCharsets;
          +import java.util.ArrayList;
          +import java.util.Collections;
          +import java.util.HashSet;
          +import java.util.List;
          +import java.util.Set;
          +import java.util.concurrent.atomic.AtomicBoolean;
          +import java.util.concurrent.atomic.AtomicReference;
          +
          +import org.apache.commons.io.IOUtils;
          +import org.apache.nifi.annotation.behavior.EventDriven;
          +import org.apache.nifi.annotation.behavior.InputRequirement;
          +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
          +import org.apache.nifi.annotation.behavior.SideEffectFree;
          +import org.apache.nifi.annotation.behavior.SupportsBatching;
          +import org.apache.nifi.annotation.documentation.CapabilityDescription;
          +import org.apache.nifi.annotation.documentation.Tags;
          +import org.apache.nifi.annotation.lifecycle.OnScheduled;
          +import org.apache.nifi.components.PropertyDescriptor;
          +import org.apache.nifi.flowfile.FlowFile;
          +import org.apache.nifi.logging.ComponentLog;
          +import org.apache.nifi.processor.AbstractProcessor;
          +import org.apache.nifi.processor.ProcessContext;
          +import org.apache.nifi.processor.ProcessSession;
          +import org.apache.nifi.processor.ProcessorInitializationContext;
          +import org.apache.nifi.processor.Relationship;
          +import org.apache.nifi.processor.io.InputStreamCallback;
          +import org.apache.nifi.processor.util.StandardValidators;
          +
          +import org.everit.json.schema.Schema;
          +import org.everit.json.schema.ValidationException;
          +import org.everit.json.schema.loader.SchemaLoader;
          +import org.json.JSONArray;
          +import org.json.JSONObject;
          +import org.json.JSONTokener;
          +
          +
          +@EventDriven
          +@SideEffectFree
          +@SupportsBatching
          +@InputRequirement(Requirement.INPUT_REQUIRED)
          +@Tags(

          {"json", "schema", "validation"}

          )
          +@CapabilityDescription("Validates the contents of FlowFiles against a user-specified JSON Schema file")
          +public class ValidateJson extends AbstractProcessor {
          +
          + public static final PropertyDescriptor SCHEMA_FILE = new PropertyDescriptor.Builder()
          — End diff –

          Would it be possible to have this property to accept both a file and a string representation of the schema? If the input given by the user is an existing file we use a file as you propose, otherwise we use the string as the schema definition. I think that a lot of users would like to pass a schema definition without using a file (given that, in cluster mode, the file will have to be available on each node). Thoughts?

          Show
          githubbot ASF GitHub Bot added a comment - Github user pvillard31 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1037#discussion_r79654708 — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateJson.java — @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.commons.io.IOUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; + +import org.everit.json.schema.Schema; +import org.everit.json.schema.ValidationException; +import org.everit.json.schema.loader.SchemaLoader; +import org.json.JSONArray; +import org.json.JSONObject; +import org.json.JSONTokener; + + +@EventDriven +@SideEffectFree +@SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags( {"json", "schema", "validation"} ) +@CapabilityDescription("Validates the contents of FlowFiles against a user-specified JSON Schema file") +public class ValidateJson extends AbstractProcessor { + + public static final PropertyDescriptor SCHEMA_FILE = new PropertyDescriptor.Builder() — End diff – Would it be possible to have this property to accept both a file and a string representation of the schema? If the input given by the user is an existing file we use a file as you propose, otherwise we use the string as the schema definition. I think that a lot of users would like to pass a schema definition without using a file (given that, in cluster mode, the file will have to be available on each node). Thoughts?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user pvillard31 commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1037#discussion_r79655788

          — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateJson.java —
          @@ -0,0 +1,159 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.processors.standard;
          +
          +import java.io.File;
          +import java.io.FileInputStream;
          +import java.io.IOException;
          +import java.io.InputStream;
          +import java.nio.charset.StandardCharsets;
          +import java.util.ArrayList;
          +import java.util.Collections;
          +import java.util.HashSet;
          +import java.util.List;
          +import java.util.Set;
          +import java.util.concurrent.atomic.AtomicBoolean;
          +import java.util.concurrent.atomic.AtomicReference;
          +
          +import org.apache.commons.io.IOUtils;
          +import org.apache.nifi.annotation.behavior.EventDriven;
          +import org.apache.nifi.annotation.behavior.InputRequirement;
          +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
          +import org.apache.nifi.annotation.behavior.SideEffectFree;
          +import org.apache.nifi.annotation.behavior.SupportsBatching;
          +import org.apache.nifi.annotation.documentation.CapabilityDescription;
          +import org.apache.nifi.annotation.documentation.Tags;
          +import org.apache.nifi.annotation.lifecycle.OnScheduled;
          +import org.apache.nifi.components.PropertyDescriptor;
          +import org.apache.nifi.flowfile.FlowFile;
          +import org.apache.nifi.logging.ComponentLog;
          +import org.apache.nifi.processor.AbstractProcessor;
          +import org.apache.nifi.processor.ProcessContext;
          +import org.apache.nifi.processor.ProcessSession;
          +import org.apache.nifi.processor.ProcessorInitializationContext;
          +import org.apache.nifi.processor.Relationship;
          +import org.apache.nifi.processor.io.InputStreamCallback;
          +import org.apache.nifi.processor.util.StandardValidators;
          +
          +import org.everit.json.schema.Schema;
          +import org.everit.json.schema.ValidationException;
          +import org.everit.json.schema.loader.SchemaLoader;
          +import org.json.JSONArray;
          +import org.json.JSONObject;
          +import org.json.JSONTokener;
          +
          +
          +@EventDriven
          +@SideEffectFree
          +@SupportsBatching
          +@InputRequirement(Requirement.INPUT_REQUIRED)
          +@Tags(

          {"json", "schema", "validation"}

          )
          +@CapabilityDescription("Validates the contents of FlowFiles against a user-specified JSON Schema file")
          +public class ValidateJson extends AbstractProcessor {
          +
          + public static final PropertyDescriptor SCHEMA_FILE = new PropertyDescriptor.Builder()
          + .name("Schema File")
          + .description("The path to the Schema file that is to be used for validation")
          + .required(true)
          + .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
          + .build();
          +
          + public static final Relationship REL_VALID = new Relationship.Builder()
          + .name("valid")
          + .description("FlowFiles that are successfully validated against the schema are routed to this relationship")
          + .build();
          + public static final Relationship REL_INVALID = new Relationship.Builder()
          + .name("invalid")
          + .description("FlowFiles that are not valid according to the specified schema are routed to this relationship")
          + .build();
          +
          + private List<PropertyDescriptor> properties;
          + private Set<Relationship> relationships;
          + private final AtomicReference<Schema> schemaRef = new AtomicReference<>();
          +
          + @Override
          + protected void init(final ProcessorInitializationContext context)

          { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(SCHEMA_FILE); + this.properties = Collections.unmodifiableList(properties); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_VALID); + relationships.add(REL_INVALID); + this.relationships = Collections.unmodifiableSet(relationships); + }

          +
          + @Override
          + public Set<Relationship> getRelationships()

          { + return relationships; + }

          +
          + @Override
          + protected List<PropertyDescriptor> getSupportedPropertyDescriptors()

          { + return properties; + }

          +
          + @OnScheduled
          + public void parseSchema(final ProcessContext context) throws IOException {
          + try(FileInputStream inputStream = new FileInputStream(new File(context.getProperty(SCHEMA_FILE).getValue())))

          { + JSONTokener jsonTokener = new JSONTokener(inputStream); + JSONObject jsonObject = new JSONObject(jsonTokener); + Schema schema = SchemaLoader.load(jsonObject); + this.schemaRef.set(schema); + }

          + }
          +
          + @Override
          + public void onTrigger(final ProcessContext context, final ProcessSession session) {
          + final List<FlowFile> flowFiles = session.get(50);
          + if (flowFiles.isEmpty())

          { + return; + }

          + final Schema schema = schemaRef.get();
          + final ComponentLog logger = getLogger();
          +
          + for (final FlowFile flowFile : flowFiles) {
          + final AtomicBoolean valid = new AtomicBoolean(true);
          + session.read(flowFile, new InputStreamCallback() {
          + @Override
          + public void process(final InputStream in) throws IOException {
          + try {
          + String str = IOUtils.toString(in, StandardCharsets.UTF_8);
          + if (str.startsWith("["))

          { + schema.validate(new JSONArray(str)); // throws a ValidationException if this object is invalid + }

          else

          { + schema.validate(new JSONObject(str)); // throws a ValidationException if this object is invalid + }

          + } catch (final IllegalArgumentException | ValidationException e) {
          + valid.set(false);
          + logger.debug("Failed to validate {} against schema due to {}", new Object[]

          {flowFile, e}

          );
          + }
          + }
          + });
          +
          + if (valid.get()) {
          + logger.info("Successfully validated {} against schema; routing to 'valid'", new Object[]

          {flowFile});
          + session.getProvenanceReporter().route(flowFile, REL_VALID);
          + session.transfer(flowFile, REL_VALID);
          + } else {
          + logger.info("Failed to validate {} against schema; routing to 'invalid'", new Object[]{flowFile}

          );
          — End diff –

          I'd recommend debug level otherwise it'll become very verbose.

          Show
          githubbot ASF GitHub Bot added a comment - Github user pvillard31 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1037#discussion_r79655788 — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateJson.java — @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.commons.io.IOUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; + +import org.everit.json.schema.Schema; +import org.everit.json.schema.ValidationException; +import org.everit.json.schema.loader.SchemaLoader; +import org.json.JSONArray; +import org.json.JSONObject; +import org.json.JSONTokener; + + +@EventDriven +@SideEffectFree +@SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags( {"json", "schema", "validation"} ) +@CapabilityDescription("Validates the contents of FlowFiles against a user-specified JSON Schema file") +public class ValidateJson extends AbstractProcessor { + + public static final PropertyDescriptor SCHEMA_FILE = new PropertyDescriptor.Builder() + .name("Schema File") + .description("The path to the Schema file that is to be used for validation") + .required(true) + .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) + .build(); + + public static final Relationship REL_VALID = new Relationship.Builder() + .name("valid") + .description("FlowFiles that are successfully validated against the schema are routed to this relationship") + .build(); + public static final Relationship REL_INVALID = new Relationship.Builder() + .name("invalid") + .description("FlowFiles that are not valid according to the specified schema are routed to this relationship") + .build(); + + private List<PropertyDescriptor> properties; + private Set<Relationship> relationships; + private final AtomicReference<Schema> schemaRef = new AtomicReference<>(); + + @Override + protected void init(final ProcessorInitializationContext context) { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(SCHEMA_FILE); + this.properties = Collections.unmodifiableList(properties); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_VALID); + relationships.add(REL_INVALID); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @OnScheduled + public void parseSchema(final ProcessContext context) throws IOException { + try(FileInputStream inputStream = new FileInputStream(new File(context.getProperty(SCHEMA_FILE).getValue()))) { + JSONTokener jsonTokener = new JSONTokener(inputStream); + JSONObject jsonObject = new JSONObject(jsonTokener); + Schema schema = SchemaLoader.load(jsonObject); + this.schemaRef.set(schema); + } + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + final List<FlowFile> flowFiles = session.get(50); + if (flowFiles.isEmpty()) { + return; + } + final Schema schema = schemaRef.get(); + final ComponentLog logger = getLogger(); + + for (final FlowFile flowFile : flowFiles) { + final AtomicBoolean valid = new AtomicBoolean(true); + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + try { + String str = IOUtils.toString(in, StandardCharsets.UTF_8); + if (str.startsWith("[")) { + schema.validate(new JSONArray(str)); // throws a ValidationException if this object is invalid + } else { + schema.validate(new JSONObject(str)); // throws a ValidationException if this object is invalid + } + } catch (final IllegalArgumentException | ValidationException e) { + valid.set(false); + logger.debug("Failed to validate {} against schema due to {}", new Object[] {flowFile, e} ); + } + } + }); + + if (valid.get()) { + logger.info("Successfully validated {} against schema; routing to 'valid'", new Object[] {flowFile}); + session.getProvenanceReporter().route(flowFile, REL_VALID); + session.transfer(flowFile, REL_VALID); + } else { + logger.info("Failed to validate {} against schema; routing to 'invalid'", new Object[]{flowFile} ); — End diff – I'd recommend debug level otherwise it'll become very verbose.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mattyb149 commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1037#discussion_r79657850

          — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateJson.java —
          @@ -0,0 +1,159 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.processors.standard;
          +
          +import java.io.File;
          +import java.io.FileInputStream;
          +import java.io.IOException;
          +import java.io.InputStream;
          +import java.nio.charset.StandardCharsets;
          +import java.util.ArrayList;
          +import java.util.Collections;
          +import java.util.HashSet;
          +import java.util.List;
          +import java.util.Set;
          +import java.util.concurrent.atomic.AtomicBoolean;
          +import java.util.concurrent.atomic.AtomicReference;
          +
          +import org.apache.commons.io.IOUtils;
          +import org.apache.nifi.annotation.behavior.EventDriven;
          +import org.apache.nifi.annotation.behavior.InputRequirement;
          +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
          +import org.apache.nifi.annotation.behavior.SideEffectFree;
          +import org.apache.nifi.annotation.behavior.SupportsBatching;
          +import org.apache.nifi.annotation.documentation.CapabilityDescription;
          +import org.apache.nifi.annotation.documentation.Tags;
          +import org.apache.nifi.annotation.lifecycle.OnScheduled;
          +import org.apache.nifi.components.PropertyDescriptor;
          +import org.apache.nifi.flowfile.FlowFile;
          +import org.apache.nifi.logging.ComponentLog;
          +import org.apache.nifi.processor.AbstractProcessor;
          +import org.apache.nifi.processor.ProcessContext;
          +import org.apache.nifi.processor.ProcessSession;
          +import org.apache.nifi.processor.ProcessorInitializationContext;
          +import org.apache.nifi.processor.Relationship;
          +import org.apache.nifi.processor.io.InputStreamCallback;
          +import org.apache.nifi.processor.util.StandardValidators;
          +
          +import org.everit.json.schema.Schema;
          +import org.everit.json.schema.ValidationException;
          +import org.everit.json.schema.loader.SchemaLoader;
          +import org.json.JSONArray;
          +import org.json.JSONObject;
          +import org.json.JSONTokener;
          +
          +
          +@EventDriven
          +@SideEffectFree
          +@SupportsBatching
          +@InputRequirement(Requirement.INPUT_REQUIRED)
          +@Tags(

          {"json", "schema", "validation"}

          )
          +@CapabilityDescription("Validates the contents of FlowFiles against a user-specified JSON Schema file")
          +public class ValidateJson extends AbstractProcessor {
          +
          + public static final PropertyDescriptor SCHEMA_FILE = new PropertyDescriptor.Builder()
          + .name("Schema File")
          + .description("The path to the Schema file that is to be used for validation")
          + .required(true)
          + .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
          + .build();
          +
          + public static final Relationship REL_VALID = new Relationship.Builder()
          + .name("valid")
          + .description("FlowFiles that are successfully validated against the schema are routed to this relationship")
          + .build();
          + public static final Relationship REL_INVALID = new Relationship.Builder()
          + .name("invalid")
          + .description("FlowFiles that are not valid according to the specified schema are routed to this relationship")
          + .build();
          +
          + private List<PropertyDescriptor> properties;
          + private Set<Relationship> relationships;
          + private final AtomicReference<Schema> schemaRef = new AtomicReference<>();
          +
          + @Override
          + protected void init(final ProcessorInitializationContext context)

          { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(SCHEMA_FILE); + this.properties = Collections.unmodifiableList(properties); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_VALID); + relationships.add(REL_INVALID); + this.relationships = Collections.unmodifiableSet(relationships); + }

          +
          + @Override
          + public Set<Relationship> getRelationships()

          { + return relationships; + }

          +
          + @Override
          + protected List<PropertyDescriptor> getSupportedPropertyDescriptors()

          { + return properties; + }

          +
          + @OnScheduled
          + public void parseSchema(final ProcessContext context) throws IOException {
          + try(FileInputStream inputStream = new FileInputStream(new File(context.getProperty(SCHEMA_FILE).getValue())))

          { + JSONTokener jsonTokener = new JSONTokener(inputStream); + JSONObject jsonObject = new JSONObject(jsonTokener); + Schema schema = SchemaLoader.load(jsonObject); + this.schemaRef.set(schema); + }

          + }
          +
          + @Override
          + public void onTrigger(final ProcessContext context, final ProcessSession session) {
          + final List<FlowFile> flowFiles = session.get(50);
          + if (flowFiles.isEmpty())

          { + return; + }

          + final Schema schema = schemaRef.get();
          + final ComponentLog logger = getLogger();
          +
          + for (final FlowFile flowFile : flowFiles) {
          + final AtomicBoolean valid = new AtomicBoolean(true);
          + session.read(flowFile, new InputStreamCallback() {
          + @Override
          + public void process(final InputStream in) throws IOException {
          — End diff –

          An IOException here looks like it would cause a session rollback, is that your intended behavior? Or should something catch that and route to invalid or failure?

          Show
          githubbot ASF GitHub Bot added a comment - Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1037#discussion_r79657850 — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateJson.java — @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.commons.io.IOUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; + +import org.everit.json.schema.Schema; +import org.everit.json.schema.ValidationException; +import org.everit.json.schema.loader.SchemaLoader; +import org.json.JSONArray; +import org.json.JSONObject; +import org.json.JSONTokener; + + +@EventDriven +@SideEffectFree +@SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags( {"json", "schema", "validation"} ) +@CapabilityDescription("Validates the contents of FlowFiles against a user-specified JSON Schema file") +public class ValidateJson extends AbstractProcessor { + + public static final PropertyDescriptor SCHEMA_FILE = new PropertyDescriptor.Builder() + .name("Schema File") + .description("The path to the Schema file that is to be used for validation") + .required(true) + .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) + .build(); + + public static final Relationship REL_VALID = new Relationship.Builder() + .name("valid") + .description("FlowFiles that are successfully validated against the schema are routed to this relationship") + .build(); + public static final Relationship REL_INVALID = new Relationship.Builder() + .name("invalid") + .description("FlowFiles that are not valid according to the specified schema are routed to this relationship") + .build(); + + private List<PropertyDescriptor> properties; + private Set<Relationship> relationships; + private final AtomicReference<Schema> schemaRef = new AtomicReference<>(); + + @Override + protected void init(final ProcessorInitializationContext context) { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(SCHEMA_FILE); + this.properties = Collections.unmodifiableList(properties); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_VALID); + relationships.add(REL_INVALID); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @OnScheduled + public void parseSchema(final ProcessContext context) throws IOException { + try(FileInputStream inputStream = new FileInputStream(new File(context.getProperty(SCHEMA_FILE).getValue()))) { + JSONTokener jsonTokener = new JSONTokener(inputStream); + JSONObject jsonObject = new JSONObject(jsonTokener); + Schema schema = SchemaLoader.load(jsonObject); + this.schemaRef.set(schema); + } + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + final List<FlowFile> flowFiles = session.get(50); + if (flowFiles.isEmpty()) { + return; + } + final Schema schema = schemaRef.get(); + final ComponentLog logger = getLogger(); + + for (final FlowFile flowFile : flowFiles) { + final AtomicBoolean valid = new AtomicBoolean(true); + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { — End diff – An IOException here looks like it would cause a session rollback, is that your intended behavior? Or should something catch that and route to invalid or failure?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mattyb149 commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1037#discussion_r79657612

          — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateJson.java —
          @@ -0,0 +1,159 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.processors.standard;
          +
          +import java.io.File;
          +import java.io.FileInputStream;
          +import java.io.IOException;
          +import java.io.InputStream;
          +import java.nio.charset.StandardCharsets;
          +import java.util.ArrayList;
          +import java.util.Collections;
          +import java.util.HashSet;
          +import java.util.List;
          +import java.util.Set;
          +import java.util.concurrent.atomic.AtomicBoolean;
          +import java.util.concurrent.atomic.AtomicReference;
          +
          +import org.apache.commons.io.IOUtils;
          +import org.apache.nifi.annotation.behavior.EventDriven;
          +import org.apache.nifi.annotation.behavior.InputRequirement;
          +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
          +import org.apache.nifi.annotation.behavior.SideEffectFree;
          +import org.apache.nifi.annotation.behavior.SupportsBatching;
          +import org.apache.nifi.annotation.documentation.CapabilityDescription;
          +import org.apache.nifi.annotation.documentation.Tags;
          +import org.apache.nifi.annotation.lifecycle.OnScheduled;
          +import org.apache.nifi.components.PropertyDescriptor;
          +import org.apache.nifi.flowfile.FlowFile;
          +import org.apache.nifi.logging.ComponentLog;
          +import org.apache.nifi.processor.AbstractProcessor;
          +import org.apache.nifi.processor.ProcessContext;
          +import org.apache.nifi.processor.ProcessSession;
          +import org.apache.nifi.processor.ProcessorInitializationContext;
          +import org.apache.nifi.processor.Relationship;
          +import org.apache.nifi.processor.io.InputStreamCallback;
          +import org.apache.nifi.processor.util.StandardValidators;
          +
          +import org.everit.json.schema.Schema;
          +import org.everit.json.schema.ValidationException;
          +import org.everit.json.schema.loader.SchemaLoader;
          +import org.json.JSONArray;
          +import org.json.JSONObject;
          +import org.json.JSONTokener;
          +
          +
          +@EventDriven
          +@SideEffectFree
          +@SupportsBatching
          +@InputRequirement(Requirement.INPUT_REQUIRED)
          +@Tags(

          {"json", "schema", "validation"}

          )
          +@CapabilityDescription("Validates the contents of FlowFiles against a user-specified JSON Schema file")
          +public class ValidateJson extends AbstractProcessor {
          +
          + public static final PropertyDescriptor SCHEMA_FILE = new PropertyDescriptor.Builder()
          + .name("Schema File")
          + .description("The path to the Schema file that is to be used for validation")
          + .required(true)
          + .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
          + .build();
          +
          + public static final Relationship REL_VALID = new Relationship.Builder()
          + .name("valid")
          + .description("FlowFiles that are successfully validated against the schema are routed to this relationship")
          + .build();
          + public static final Relationship REL_INVALID = new Relationship.Builder()
          + .name("invalid")
          + .description("FlowFiles that are not valid according to the specified schema are routed to this relationship")
          + .build();
          +
          + private List<PropertyDescriptor> properties;
          + private Set<Relationship> relationships;
          + private final AtomicReference<Schema> schemaRef = new AtomicReference<>();
          +
          + @Override
          + protected void init(final ProcessorInitializationContext context) {
          + final List<PropertyDescriptor> properties = new ArrayList<>();
          + properties.add(SCHEMA_FILE);
          — End diff –

          For NiFi clusters, it may not be prudent (or as easy as desired) to have the schema file available on all the nodes. However this is a still a good property to have. Perhaps also there could be a Schema Body property, where the schema can be pasted in? That might help for portability, templates, etc.

          In that case you'd also want a custom Validator that ensures that only one of the two are set. There is an example in [AbstractScriptProcessor](https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/AbstractScriptProcessor.java#L121) for this.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1037#discussion_r79657612 — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateJson.java — @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.commons.io.IOUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; + +import org.everit.json.schema.Schema; +import org.everit.json.schema.ValidationException; +import org.everit.json.schema.loader.SchemaLoader; +import org.json.JSONArray; +import org.json.JSONObject; +import org.json.JSONTokener; + + +@EventDriven +@SideEffectFree +@SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags( {"json", "schema", "validation"} ) +@CapabilityDescription("Validates the contents of FlowFiles against a user-specified JSON Schema file") +public class ValidateJson extends AbstractProcessor { + + public static final PropertyDescriptor SCHEMA_FILE = new PropertyDescriptor.Builder() + .name("Schema File") + .description("The path to the Schema file that is to be used for validation") + .required(true) + .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) + .build(); + + public static final Relationship REL_VALID = new Relationship.Builder() + .name("valid") + .description("FlowFiles that are successfully validated against the schema are routed to this relationship") + .build(); + public static final Relationship REL_INVALID = new Relationship.Builder() + .name("invalid") + .description("FlowFiles that are not valid according to the specified schema are routed to this relationship") + .build(); + + private List<PropertyDescriptor> properties; + private Set<Relationship> relationships; + private final AtomicReference<Schema> schemaRef = new AtomicReference<>(); + + @Override + protected void init(final ProcessorInitializationContext context) { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(SCHEMA_FILE); — End diff – For NiFi clusters, it may not be prudent (or as easy as desired) to have the schema file available on all the nodes. However this is a still a good property to have. Perhaps also there could be a Schema Body property, where the schema can be pasted in? That might help for portability, templates, etc. In that case you'd also want a custom Validator that ensures that only one of the two are set. There is an example in [AbstractScriptProcessor] ( https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/AbstractScriptProcessor.java#L121 ) for this.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mattyb149 commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1037#discussion_r79656833

          — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateJson.java —
          @@ -0,0 +1,159 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.processors.standard;
          +
          +import java.io.File;
          +import java.io.FileInputStream;
          +import java.io.IOException;
          +import java.io.InputStream;
          +import java.nio.charset.StandardCharsets;
          +import java.util.ArrayList;
          +import java.util.Collections;
          +import java.util.HashSet;
          +import java.util.List;
          +import java.util.Set;
          +import java.util.concurrent.atomic.AtomicBoolean;
          +import java.util.concurrent.atomic.AtomicReference;
          +
          +import org.apache.commons.io.IOUtils;
          +import org.apache.nifi.annotation.behavior.EventDriven;
          +import org.apache.nifi.annotation.behavior.InputRequirement;
          +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
          +import org.apache.nifi.annotation.behavior.SideEffectFree;
          +import org.apache.nifi.annotation.behavior.SupportsBatching;
          +import org.apache.nifi.annotation.documentation.CapabilityDescription;
          +import org.apache.nifi.annotation.documentation.Tags;
          +import org.apache.nifi.annotation.lifecycle.OnScheduled;
          +import org.apache.nifi.components.PropertyDescriptor;
          +import org.apache.nifi.flowfile.FlowFile;
          +import org.apache.nifi.logging.ComponentLog;
          +import org.apache.nifi.processor.AbstractProcessor;
          +import org.apache.nifi.processor.ProcessContext;
          +import org.apache.nifi.processor.ProcessSession;
          +import org.apache.nifi.processor.ProcessorInitializationContext;
          +import org.apache.nifi.processor.Relationship;
          +import org.apache.nifi.processor.io.InputStreamCallback;
          +import org.apache.nifi.processor.util.StandardValidators;
          +
          +import org.everit.json.schema.Schema;
          +import org.everit.json.schema.ValidationException;
          +import org.everit.json.schema.loader.SchemaLoader;
          +import org.json.JSONArray;
          +import org.json.JSONObject;
          +import org.json.JSONTokener;
          +
          +
          +@EventDriven
          +@SideEffectFree
          +@SupportsBatching
          +@InputRequirement(Requirement.INPUT_REQUIRED)
          +@Tags(

          {"json", "schema", "validation"}

          )
          +@CapabilityDescription("Validates the contents of FlowFiles against a user-specified JSON Schema file")
          +public class ValidateJson extends AbstractProcessor {
          +
          + public static final PropertyDescriptor SCHEMA_FILE = new PropertyDescriptor.Builder()
          + .name("Schema File")
          — End diff –

          Lately the conventional wisdom for the name() method is to choose a machine-friendly name (like "validate-json-schema-file") and use displayName() for the user-friendly name. This will help when we do more work with internationalization. Not a requirement, just a suggestion

          Show
          githubbot ASF GitHub Bot added a comment - Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1037#discussion_r79656833 — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateJson.java — @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.commons.io.IOUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; + +import org.everit.json.schema.Schema; +import org.everit.json.schema.ValidationException; +import org.everit.json.schema.loader.SchemaLoader; +import org.json.JSONArray; +import org.json.JSONObject; +import org.json.JSONTokener; + + +@EventDriven +@SideEffectFree +@SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags( {"json", "schema", "validation"} ) +@CapabilityDescription("Validates the contents of FlowFiles against a user-specified JSON Schema file") +public class ValidateJson extends AbstractProcessor { + + public static final PropertyDescriptor SCHEMA_FILE = new PropertyDescriptor.Builder() + .name("Schema File") — End diff – Lately the conventional wisdom for the name() method is to choose a machine-friendly name (like "validate-json-schema-file") and use displayName() for the user-friendly name. This will help when we do more work with internationalization. Not a requirement, just a suggestion
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user bartoszjkwozniak commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1037#discussion_r79778565

          — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateJson.java —
          @@ -0,0 +1,159 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.processors.standard;
          +
          +import java.io.File;
          +import java.io.FileInputStream;
          +import java.io.IOException;
          +import java.io.InputStream;
          +import java.nio.charset.StandardCharsets;
          +import java.util.ArrayList;
          +import java.util.Collections;
          +import java.util.HashSet;
          +import java.util.List;
          +import java.util.Set;
          +import java.util.concurrent.atomic.AtomicBoolean;
          +import java.util.concurrent.atomic.AtomicReference;
          +
          +import org.apache.commons.io.IOUtils;
          +import org.apache.nifi.annotation.behavior.EventDriven;
          +import org.apache.nifi.annotation.behavior.InputRequirement;
          +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
          +import org.apache.nifi.annotation.behavior.SideEffectFree;
          +import org.apache.nifi.annotation.behavior.SupportsBatching;
          +import org.apache.nifi.annotation.documentation.CapabilityDescription;
          +import org.apache.nifi.annotation.documentation.Tags;
          +import org.apache.nifi.annotation.lifecycle.OnScheduled;
          +import org.apache.nifi.components.PropertyDescriptor;
          +import org.apache.nifi.flowfile.FlowFile;
          +import org.apache.nifi.logging.ComponentLog;
          +import org.apache.nifi.processor.AbstractProcessor;
          +import org.apache.nifi.processor.ProcessContext;
          +import org.apache.nifi.processor.ProcessSession;
          +import org.apache.nifi.processor.ProcessorInitializationContext;
          +import org.apache.nifi.processor.Relationship;
          +import org.apache.nifi.processor.io.InputStreamCallback;
          +import org.apache.nifi.processor.util.StandardValidators;
          +
          +import org.everit.json.schema.Schema;
          +import org.everit.json.schema.ValidationException;
          +import org.everit.json.schema.loader.SchemaLoader;
          +import org.json.JSONArray;
          +import org.json.JSONObject;
          +import org.json.JSONTokener;
          +
          +
          +@EventDriven
          +@SideEffectFree
          +@SupportsBatching
          +@InputRequirement(Requirement.INPUT_REQUIRED)
          +@Tags(

          {"json", "schema", "validation"}

          )
          +@CapabilityDescription("Validates the contents of FlowFiles against a user-specified JSON Schema file")
          +public class ValidateJson extends AbstractProcessor {
          +
          + public static final PropertyDescriptor SCHEMA_FILE = new PropertyDescriptor.Builder()
          — End diff –

          Added Schema Body property and custom validator - exactly one of Schema File or Schema Body must be set.

          Show
          githubbot ASF GitHub Bot added a comment - Github user bartoszjkwozniak commented on a diff in the pull request: https://github.com/apache/nifi/pull/1037#discussion_r79778565 — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateJson.java — @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.commons.io.IOUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; + +import org.everit.json.schema.Schema; +import org.everit.json.schema.ValidationException; +import org.everit.json.schema.loader.SchemaLoader; +import org.json.JSONArray; +import org.json.JSONObject; +import org.json.JSONTokener; + + +@EventDriven +@SideEffectFree +@SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags( {"json", "schema", "validation"} ) +@CapabilityDescription("Validates the contents of FlowFiles against a user-specified JSON Schema file") +public class ValidateJson extends AbstractProcessor { + + public static final PropertyDescriptor SCHEMA_FILE = new PropertyDescriptor.Builder() — End diff – Added Schema Body property and custom validator - exactly one of Schema File or Schema Body must be set.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user bartoszjkwozniak commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1037#discussion_r79778601

          — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateJson.java —
          @@ -0,0 +1,159 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.processors.standard;
          +
          +import java.io.File;
          +import java.io.FileInputStream;
          +import java.io.IOException;
          +import java.io.InputStream;
          +import java.nio.charset.StandardCharsets;
          +import java.util.ArrayList;
          +import java.util.Collections;
          +import java.util.HashSet;
          +import java.util.List;
          +import java.util.Set;
          +import java.util.concurrent.atomic.AtomicBoolean;
          +import java.util.concurrent.atomic.AtomicReference;
          +
          +import org.apache.commons.io.IOUtils;
          +import org.apache.nifi.annotation.behavior.EventDriven;
          +import org.apache.nifi.annotation.behavior.InputRequirement;
          +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
          +import org.apache.nifi.annotation.behavior.SideEffectFree;
          +import org.apache.nifi.annotation.behavior.SupportsBatching;
          +import org.apache.nifi.annotation.documentation.CapabilityDescription;
          +import org.apache.nifi.annotation.documentation.Tags;
          +import org.apache.nifi.annotation.lifecycle.OnScheduled;
          +import org.apache.nifi.components.PropertyDescriptor;
          +import org.apache.nifi.flowfile.FlowFile;
          +import org.apache.nifi.logging.ComponentLog;
          +import org.apache.nifi.processor.AbstractProcessor;
          +import org.apache.nifi.processor.ProcessContext;
          +import org.apache.nifi.processor.ProcessSession;
          +import org.apache.nifi.processor.ProcessorInitializationContext;
          +import org.apache.nifi.processor.Relationship;
          +import org.apache.nifi.processor.io.InputStreamCallback;
          +import org.apache.nifi.processor.util.StandardValidators;
          +
          +import org.everit.json.schema.Schema;
          +import org.everit.json.schema.ValidationException;
          +import org.everit.json.schema.loader.SchemaLoader;
          +import org.json.JSONArray;
          +import org.json.JSONObject;
          +import org.json.JSONTokener;
          +
          +
          +@EventDriven
          +@SideEffectFree
          +@SupportsBatching
          +@InputRequirement(Requirement.INPUT_REQUIRED)
          +@Tags(

          {"json", "schema", "validation"}

          )
          +@CapabilityDescription("Validates the contents of FlowFiles against a user-specified JSON Schema file")
          +public class ValidateJson extends AbstractProcessor {
          +
          + public static final PropertyDescriptor SCHEMA_FILE = new PropertyDescriptor.Builder()
          + .name("Schema File")
          — End diff –

          sure thing, wasn't aware of that

          Show
          githubbot ASF GitHub Bot added a comment - Github user bartoszjkwozniak commented on a diff in the pull request: https://github.com/apache/nifi/pull/1037#discussion_r79778601 — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateJson.java — @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.commons.io.IOUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; + +import org.everit.json.schema.Schema; +import org.everit.json.schema.ValidationException; +import org.everit.json.schema.loader.SchemaLoader; +import org.json.JSONArray; +import org.json.JSONObject; +import org.json.JSONTokener; + + +@EventDriven +@SideEffectFree +@SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags( {"json", "schema", "validation"} ) +@CapabilityDescription("Validates the contents of FlowFiles against a user-specified JSON Schema file") +public class ValidateJson extends AbstractProcessor { + + public static final PropertyDescriptor SCHEMA_FILE = new PropertyDescriptor.Builder() + .name("Schema File") — End diff – sure thing, wasn't aware of that
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user bartoszjkwozniak commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1037#discussion_r79778806

          — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateJson.java —
          @@ -0,0 +1,159 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.processors.standard;
          +
          +import java.io.File;
          +import java.io.FileInputStream;
          +import java.io.IOException;
          +import java.io.InputStream;
          +import java.nio.charset.StandardCharsets;
          +import java.util.ArrayList;
          +import java.util.Collections;
          +import java.util.HashSet;
          +import java.util.List;
          +import java.util.Set;
          +import java.util.concurrent.atomic.AtomicBoolean;
          +import java.util.concurrent.atomic.AtomicReference;
          +
          +import org.apache.commons.io.IOUtils;
          +import org.apache.nifi.annotation.behavior.EventDriven;
          +import org.apache.nifi.annotation.behavior.InputRequirement;
          +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
          +import org.apache.nifi.annotation.behavior.SideEffectFree;
          +import org.apache.nifi.annotation.behavior.SupportsBatching;
          +import org.apache.nifi.annotation.documentation.CapabilityDescription;
          +import org.apache.nifi.annotation.documentation.Tags;
          +import org.apache.nifi.annotation.lifecycle.OnScheduled;
          +import org.apache.nifi.components.PropertyDescriptor;
          +import org.apache.nifi.flowfile.FlowFile;
          +import org.apache.nifi.logging.ComponentLog;
          +import org.apache.nifi.processor.AbstractProcessor;
          +import org.apache.nifi.processor.ProcessContext;
          +import org.apache.nifi.processor.ProcessSession;
          +import org.apache.nifi.processor.ProcessorInitializationContext;
          +import org.apache.nifi.processor.Relationship;
          +import org.apache.nifi.processor.io.InputStreamCallback;
          +import org.apache.nifi.processor.util.StandardValidators;
          +
          +import org.everit.json.schema.Schema;
          +import org.everit.json.schema.ValidationException;
          +import org.everit.json.schema.loader.SchemaLoader;
          +import org.json.JSONArray;
          +import org.json.JSONObject;
          +import org.json.JSONTokener;
          +
          +
          +@EventDriven
          +@SideEffectFree
          +@SupportsBatching
          +@InputRequirement(Requirement.INPUT_REQUIRED)
          +@Tags(

          {"json", "schema", "validation"}

          )
          +@CapabilityDescription("Validates the contents of FlowFiles against a user-specified JSON Schema file")
          +public class ValidateJson extends AbstractProcessor {
          +
          + public static final PropertyDescriptor SCHEMA_FILE = new PropertyDescriptor.Builder()
          + .name("Schema File")
          + .description("The path to the Schema file that is to be used for validation")
          + .required(true)
          + .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
          + .build();
          +
          + public static final Relationship REL_VALID = new Relationship.Builder()
          + .name("valid")
          + .description("FlowFiles that are successfully validated against the schema are routed to this relationship")
          + .build();
          + public static final Relationship REL_INVALID = new Relationship.Builder()
          + .name("invalid")
          + .description("FlowFiles that are not valid according to the specified schema are routed to this relationship")
          + .build();
          +
          + private List<PropertyDescriptor> properties;
          + private Set<Relationship> relationships;
          + private final AtomicReference<Schema> schemaRef = new AtomicReference<>();
          +
          + @Override
          + protected void init(final ProcessorInitializationContext context)

          { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(SCHEMA_FILE); + this.properties = Collections.unmodifiableList(properties); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_VALID); + relationships.add(REL_INVALID); + this.relationships = Collections.unmodifiableSet(relationships); + }

          +
          + @Override
          + public Set<Relationship> getRelationships()

          { + return relationships; + }

          +
          + @Override
          + protected List<PropertyDescriptor> getSupportedPropertyDescriptors()

          { + return properties; + }

          +
          + @OnScheduled
          + public void parseSchema(final ProcessContext context) throws IOException {
          + try(FileInputStream inputStream = new FileInputStream(new File(context.getProperty(SCHEMA_FILE).getValue())))

          { + JSONTokener jsonTokener = new JSONTokener(inputStream); + JSONObject jsonObject = new JSONObject(jsonTokener); + Schema schema = SchemaLoader.load(jsonObject); + this.schemaRef.set(schema); + }

          + }
          +
          + @Override
          + public void onTrigger(final ProcessContext context, final ProcessSession session) {
          + final List<FlowFile> flowFiles = session.get(50);
          — End diff –

          did some changes, not sure about that but hope it does what you ment.

          Show
          githubbot ASF GitHub Bot added a comment - Github user bartoszjkwozniak commented on a diff in the pull request: https://github.com/apache/nifi/pull/1037#discussion_r79778806 — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateJson.java — @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.commons.io.IOUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; + +import org.everit.json.schema.Schema; +import org.everit.json.schema.ValidationException; +import org.everit.json.schema.loader.SchemaLoader; +import org.json.JSONArray; +import org.json.JSONObject; +import org.json.JSONTokener; + + +@EventDriven +@SideEffectFree +@SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags( {"json", "schema", "validation"} ) +@CapabilityDescription("Validates the contents of FlowFiles against a user-specified JSON Schema file") +public class ValidateJson extends AbstractProcessor { + + public static final PropertyDescriptor SCHEMA_FILE = new PropertyDescriptor.Builder() + .name("Schema File") + .description("The path to the Schema file that is to be used for validation") + .required(true) + .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) + .build(); + + public static final Relationship REL_VALID = new Relationship.Builder() + .name("valid") + .description("FlowFiles that are successfully validated against the schema are routed to this relationship") + .build(); + public static final Relationship REL_INVALID = new Relationship.Builder() + .name("invalid") + .description("FlowFiles that are not valid according to the specified schema are routed to this relationship") + .build(); + + private List<PropertyDescriptor> properties; + private Set<Relationship> relationships; + private final AtomicReference<Schema> schemaRef = new AtomicReference<>(); + + @Override + protected void init(final ProcessorInitializationContext context) { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(SCHEMA_FILE); + this.properties = Collections.unmodifiableList(properties); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_VALID); + relationships.add(REL_INVALID); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @OnScheduled + public void parseSchema(final ProcessContext context) throws IOException { + try(FileInputStream inputStream = new FileInputStream(new File(context.getProperty(SCHEMA_FILE).getValue()))) { + JSONTokener jsonTokener = new JSONTokener(inputStream); + JSONObject jsonObject = new JSONObject(jsonTokener); + Schema schema = SchemaLoader.load(jsonObject); + this.schemaRef.set(schema); + } + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + final List<FlowFile> flowFiles = session.get(50); — End diff – did some changes, not sure about that but hope it does what you ment.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user bartoszjkwozniak commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1037#discussion_r79778851

          — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateJson.java —
          @@ -0,0 +1,159 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.processors.standard;
          +
          +import java.io.File;
          +import java.io.FileInputStream;
          +import java.io.IOException;
          +import java.io.InputStream;
          +import java.nio.charset.StandardCharsets;
          +import java.util.ArrayList;
          +import java.util.Collections;
          +import java.util.HashSet;
          +import java.util.List;
          +import java.util.Set;
          +import java.util.concurrent.atomic.AtomicBoolean;
          +import java.util.concurrent.atomic.AtomicReference;
          +
          +import org.apache.commons.io.IOUtils;
          +import org.apache.nifi.annotation.behavior.EventDriven;
          +import org.apache.nifi.annotation.behavior.InputRequirement;
          +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
          +import org.apache.nifi.annotation.behavior.SideEffectFree;
          +import org.apache.nifi.annotation.behavior.SupportsBatching;
          +import org.apache.nifi.annotation.documentation.CapabilityDescription;
          +import org.apache.nifi.annotation.documentation.Tags;
          +import org.apache.nifi.annotation.lifecycle.OnScheduled;
          +import org.apache.nifi.components.PropertyDescriptor;
          +import org.apache.nifi.flowfile.FlowFile;
          +import org.apache.nifi.logging.ComponentLog;
          +import org.apache.nifi.processor.AbstractProcessor;
          +import org.apache.nifi.processor.ProcessContext;
          +import org.apache.nifi.processor.ProcessSession;
          +import org.apache.nifi.processor.ProcessorInitializationContext;
          +import org.apache.nifi.processor.Relationship;
          +import org.apache.nifi.processor.io.InputStreamCallback;
          +import org.apache.nifi.processor.util.StandardValidators;
          +
          +import org.everit.json.schema.Schema;
          +import org.everit.json.schema.ValidationException;
          +import org.everit.json.schema.loader.SchemaLoader;
          +import org.json.JSONArray;
          +import org.json.JSONObject;
          +import org.json.JSONTokener;
          +
          +
          +@EventDriven
          +@SideEffectFree
          +@SupportsBatching
          +@InputRequirement(Requirement.INPUT_REQUIRED)
          +@Tags(

          {"json", "schema", "validation"}

          )
          +@CapabilityDescription("Validates the contents of FlowFiles against a user-specified JSON Schema file")
          +public class ValidateJson extends AbstractProcessor {
          +
          + public static final PropertyDescriptor SCHEMA_FILE = new PropertyDescriptor.Builder()
          + .name("Schema File")
          + .description("The path to the Schema file that is to be used for validation")
          + .required(true)
          + .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
          + .build();
          +
          + public static final Relationship REL_VALID = new Relationship.Builder()
          + .name("valid")
          + .description("FlowFiles that are successfully validated against the schema are routed to this relationship")
          + .build();
          + public static final Relationship REL_INVALID = new Relationship.Builder()
          + .name("invalid")
          + .description("FlowFiles that are not valid according to the specified schema are routed to this relationship")
          + .build();
          +
          + private List<PropertyDescriptor> properties;
          + private Set<Relationship> relationships;
          + private final AtomicReference<Schema> schemaRef = new AtomicReference<>();
          +
          + @Override
          + protected void init(final ProcessorInitializationContext context)

          { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(SCHEMA_FILE); + this.properties = Collections.unmodifiableList(properties); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_VALID); + relationships.add(REL_INVALID); + this.relationships = Collections.unmodifiableSet(relationships); + }

          +
          + @Override
          + public Set<Relationship> getRelationships()

          { + return relationships; + }

          +
          + @Override
          + protected List<PropertyDescriptor> getSupportedPropertyDescriptors()

          { + return properties; + }

          +
          + @OnScheduled
          + public void parseSchema(final ProcessContext context) throws IOException {
          + try(FileInputStream inputStream = new FileInputStream(new File(context.getProperty(SCHEMA_FILE).getValue())))

          { + JSONTokener jsonTokener = new JSONTokener(inputStream); + JSONObject jsonObject = new JSONObject(jsonTokener); + Schema schema = SchemaLoader.load(jsonObject); + this.schemaRef.set(schema); + }

          + }
          +
          + @Override
          + public void onTrigger(final ProcessContext context, final ProcessSession session) {
          + final List<FlowFile> flowFiles = session.get(50);
          + if (flowFiles.isEmpty())

          { + return; + }

          + final Schema schema = schemaRef.get();
          + final ComponentLog logger = getLogger();
          +
          + for (final FlowFile flowFile : flowFiles) {
          + final AtomicBoolean valid = new AtomicBoolean(true);
          + session.read(flowFile, new InputStreamCallback() {
          + @Override
          + public void process(final InputStream in) throws IOException {
          + try {
          + String str = IOUtils.toString(in, StandardCharsets.UTF_8);
          + if (str.startsWith("["))

          { + schema.validate(new JSONArray(str)); // throws a ValidationException if this object is invalid + }

          else

          { + schema.validate(new JSONObject(str)); // throws a ValidationException if this object is invalid + }

          + } catch (final IllegalArgumentException | ValidationException e) {
          + valid.set(false);
          + logger.debug("Failed to validate {} against schema due to {}", new Object[]

          {flowFile, e}

          );
          + }
          + }
          + });
          +
          + if (valid.get()) {
          + logger.info("Successfully validated {} against schema; routing to 'valid'", new Object[]

          {flowFile}

          );
          — End diff –

          done

          Show
          githubbot ASF GitHub Bot added a comment - Github user bartoszjkwozniak commented on a diff in the pull request: https://github.com/apache/nifi/pull/1037#discussion_r79778851 — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateJson.java — @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.commons.io.IOUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; + +import org.everit.json.schema.Schema; +import org.everit.json.schema.ValidationException; +import org.everit.json.schema.loader.SchemaLoader; +import org.json.JSONArray; +import org.json.JSONObject; +import org.json.JSONTokener; + + +@EventDriven +@SideEffectFree +@SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags( {"json", "schema", "validation"} ) +@CapabilityDescription("Validates the contents of FlowFiles against a user-specified JSON Schema file") +public class ValidateJson extends AbstractProcessor { + + public static final PropertyDescriptor SCHEMA_FILE = new PropertyDescriptor.Builder() + .name("Schema File") + .description("The path to the Schema file that is to be used for validation") + .required(true) + .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) + .build(); + + public static final Relationship REL_VALID = new Relationship.Builder() + .name("valid") + .description("FlowFiles that are successfully validated against the schema are routed to this relationship") + .build(); + public static final Relationship REL_INVALID = new Relationship.Builder() + .name("invalid") + .description("FlowFiles that are not valid according to the specified schema are routed to this relationship") + .build(); + + private List<PropertyDescriptor> properties; + private Set<Relationship> relationships; + private final AtomicReference<Schema> schemaRef = new AtomicReference<>(); + + @Override + protected void init(final ProcessorInitializationContext context) { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(SCHEMA_FILE); + this.properties = Collections.unmodifiableList(properties); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_VALID); + relationships.add(REL_INVALID); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @OnScheduled + public void parseSchema(final ProcessContext context) throws IOException { + try(FileInputStream inputStream = new FileInputStream(new File(context.getProperty(SCHEMA_FILE).getValue()))) { + JSONTokener jsonTokener = new JSONTokener(inputStream); + JSONObject jsonObject = new JSONObject(jsonTokener); + Schema schema = SchemaLoader.load(jsonObject); + this.schemaRef.set(schema); + } + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + final List<FlowFile> flowFiles = session.get(50); + if (flowFiles.isEmpty()) { + return; + } + final Schema schema = schemaRef.get(); + final ComponentLog logger = getLogger(); + + for (final FlowFile flowFile : flowFiles) { + final AtomicBoolean valid = new AtomicBoolean(true); + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + try { + String str = IOUtils.toString(in, StandardCharsets.UTF_8); + if (str.startsWith("[")) { + schema.validate(new JSONArray(str)); // throws a ValidationException if this object is invalid + } else { + schema.validate(new JSONObject(str)); // throws a ValidationException if this object is invalid + } + } catch (final IllegalArgumentException | ValidationException e) { + valid.set(false); + logger.debug("Failed to validate {} against schema due to {}", new Object[] {flowFile, e} ); + } + } + }); + + if (valid.get()) { + logger.info("Successfully validated {} against schema; routing to 'valid'", new Object[] {flowFile} ); — End diff – done
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user bartoszjkwozniak commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1037#discussion_r79778863

          — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateJson.java —
          @@ -0,0 +1,159 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.processors.standard;
          +
          +import java.io.File;
          +import java.io.FileInputStream;
          +import java.io.IOException;
          +import java.io.InputStream;
          +import java.nio.charset.StandardCharsets;
          +import java.util.ArrayList;
          +import java.util.Collections;
          +import java.util.HashSet;
          +import java.util.List;
          +import java.util.Set;
          +import java.util.concurrent.atomic.AtomicBoolean;
          +import java.util.concurrent.atomic.AtomicReference;
          +
          +import org.apache.commons.io.IOUtils;
          +import org.apache.nifi.annotation.behavior.EventDriven;
          +import org.apache.nifi.annotation.behavior.InputRequirement;
          +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
          +import org.apache.nifi.annotation.behavior.SideEffectFree;
          +import org.apache.nifi.annotation.behavior.SupportsBatching;
          +import org.apache.nifi.annotation.documentation.CapabilityDescription;
          +import org.apache.nifi.annotation.documentation.Tags;
          +import org.apache.nifi.annotation.lifecycle.OnScheduled;
          +import org.apache.nifi.components.PropertyDescriptor;
          +import org.apache.nifi.flowfile.FlowFile;
          +import org.apache.nifi.logging.ComponentLog;
          +import org.apache.nifi.processor.AbstractProcessor;
          +import org.apache.nifi.processor.ProcessContext;
          +import org.apache.nifi.processor.ProcessSession;
          +import org.apache.nifi.processor.ProcessorInitializationContext;
          +import org.apache.nifi.processor.Relationship;
          +import org.apache.nifi.processor.io.InputStreamCallback;
          +import org.apache.nifi.processor.util.StandardValidators;
          +
          +import org.everit.json.schema.Schema;
          +import org.everit.json.schema.ValidationException;
          +import org.everit.json.schema.loader.SchemaLoader;
          +import org.json.JSONArray;
          +import org.json.JSONObject;
          +import org.json.JSONTokener;
          +
          +
          +@EventDriven
          +@SideEffectFree
          +@SupportsBatching
          +@InputRequirement(Requirement.INPUT_REQUIRED)
          +@Tags(

          {"json", "schema", "validation"}

          )
          +@CapabilityDescription("Validates the contents of FlowFiles against a user-specified JSON Schema file")
          +public class ValidateJson extends AbstractProcessor {
          +
          + public static final PropertyDescriptor SCHEMA_FILE = new PropertyDescriptor.Builder()
          + .name("Schema File")
          + .description("The path to the Schema file that is to be used for validation")
          + .required(true)
          + .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
          + .build();
          +
          + public static final Relationship REL_VALID = new Relationship.Builder()
          + .name("valid")
          + .description("FlowFiles that are successfully validated against the schema are routed to this relationship")
          + .build();
          + public static final Relationship REL_INVALID = new Relationship.Builder()
          + .name("invalid")
          + .description("FlowFiles that are not valid according to the specified schema are routed to this relationship")
          + .build();
          +
          + private List<PropertyDescriptor> properties;
          + private Set<Relationship> relationships;
          + private final AtomicReference<Schema> schemaRef = new AtomicReference<>();
          +
          + @Override
          + protected void init(final ProcessorInitializationContext context)

          { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(SCHEMA_FILE); + this.properties = Collections.unmodifiableList(properties); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_VALID); + relationships.add(REL_INVALID); + this.relationships = Collections.unmodifiableSet(relationships); + }

          +
          + @Override
          + public Set<Relationship> getRelationships()

          { + return relationships; + }

          +
          + @Override
          + protected List<PropertyDescriptor> getSupportedPropertyDescriptors()

          { + return properties; + }

          +
          + @OnScheduled
          + public void parseSchema(final ProcessContext context) throws IOException {
          + try(FileInputStream inputStream = new FileInputStream(new File(context.getProperty(SCHEMA_FILE).getValue())))

          { + JSONTokener jsonTokener = new JSONTokener(inputStream); + JSONObject jsonObject = new JSONObject(jsonTokener); + Schema schema = SchemaLoader.load(jsonObject); + this.schemaRef.set(schema); + }

          + }
          +
          + @Override
          + public void onTrigger(final ProcessContext context, final ProcessSession session) {
          + final List<FlowFile> flowFiles = session.get(50);
          + if (flowFiles.isEmpty())

          { + return; + }

          + final Schema schema = schemaRef.get();
          + final ComponentLog logger = getLogger();
          +
          + for (final FlowFile flowFile : flowFiles) {
          + final AtomicBoolean valid = new AtomicBoolean(true);
          + session.read(flowFile, new InputStreamCallback() {
          + @Override
          + public void process(final InputStream in) throws IOException {
          + try {
          + String str = IOUtils.toString(in, StandardCharsets.UTF_8);
          + if (str.startsWith("["))

          { + schema.validate(new JSONArray(str)); // throws a ValidationException if this object is invalid + }

          else

          { + schema.validate(new JSONObject(str)); // throws a ValidationException if this object is invalid + }

          + } catch (final IllegalArgumentException | ValidationException e) {
          + valid.set(false);
          + logger.debug("Failed to validate {} against schema due to {}", new Object[]

          {flowFile, e}

          );
          + }
          + }
          + });
          +
          + if (valid.get()) {
          + logger.info("Successfully validated {} against schema; routing to 'valid'", new Object[]

          {flowFile});
          + session.getProvenanceReporter().route(flowFile, REL_VALID);
          + session.transfer(flowFile, REL_VALID);
          + } else {
          + logger.info("Failed to validate {} against schema; routing to 'invalid'", new Object[]{flowFile}

          );
          — End diff –

          done

          Show
          githubbot ASF GitHub Bot added a comment - Github user bartoszjkwozniak commented on a diff in the pull request: https://github.com/apache/nifi/pull/1037#discussion_r79778863 — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateJson.java — @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.commons.io.IOUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; + +import org.everit.json.schema.Schema; +import org.everit.json.schema.ValidationException; +import org.everit.json.schema.loader.SchemaLoader; +import org.json.JSONArray; +import org.json.JSONObject; +import org.json.JSONTokener; + + +@EventDriven +@SideEffectFree +@SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags( {"json", "schema", "validation"} ) +@CapabilityDescription("Validates the contents of FlowFiles against a user-specified JSON Schema file") +public class ValidateJson extends AbstractProcessor { + + public static final PropertyDescriptor SCHEMA_FILE = new PropertyDescriptor.Builder() + .name("Schema File") + .description("The path to the Schema file that is to be used for validation") + .required(true) + .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) + .build(); + + public static final Relationship REL_VALID = new Relationship.Builder() + .name("valid") + .description("FlowFiles that are successfully validated against the schema are routed to this relationship") + .build(); + public static final Relationship REL_INVALID = new Relationship.Builder() + .name("invalid") + .description("FlowFiles that are not valid according to the specified schema are routed to this relationship") + .build(); + + private List<PropertyDescriptor> properties; + private Set<Relationship> relationships; + private final AtomicReference<Schema> schemaRef = new AtomicReference<>(); + + @Override + protected void init(final ProcessorInitializationContext context) { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(SCHEMA_FILE); + this.properties = Collections.unmodifiableList(properties); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_VALID); + relationships.add(REL_INVALID); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @OnScheduled + public void parseSchema(final ProcessContext context) throws IOException { + try(FileInputStream inputStream = new FileInputStream(new File(context.getProperty(SCHEMA_FILE).getValue()))) { + JSONTokener jsonTokener = new JSONTokener(inputStream); + JSONObject jsonObject = new JSONObject(jsonTokener); + Schema schema = SchemaLoader.load(jsonObject); + this.schemaRef.set(schema); + } + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + final List<FlowFile> flowFiles = session.get(50); + if (flowFiles.isEmpty()) { + return; + } + final Schema schema = schemaRef.get(); + final ComponentLog logger = getLogger(); + + for (final FlowFile flowFile : flowFiles) { + final AtomicBoolean valid = new AtomicBoolean(true); + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + try { + String str = IOUtils.toString(in, StandardCharsets.UTF_8); + if (str.startsWith("[")) { + schema.validate(new JSONArray(str)); // throws a ValidationException if this object is invalid + } else { + schema.validate(new JSONObject(str)); // throws a ValidationException if this object is invalid + } + } catch (final IllegalArgumentException | ValidationException e) { + valid.set(false); + logger.debug("Failed to validate {} against schema due to {}", new Object[] {flowFile, e} ); + } + } + }); + + if (valid.get()) { + logger.info("Successfully validated {} against schema; routing to 'valid'", new Object[] {flowFile}); + session.getProvenanceReporter().route(flowFile, REL_VALID); + session.transfer(flowFile, REL_VALID); + } else { + logger.info("Failed to validate {} against schema; routing to 'invalid'", new Object[]{flowFile} ); — End diff – done
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user bartoszjkwozniak commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1037#discussion_r79778904

          — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateJson.java —
          @@ -0,0 +1,159 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.processors.standard;
          +
          +import java.io.File;
          +import java.io.FileInputStream;
          +import java.io.IOException;
          +import java.io.InputStream;
          +import java.nio.charset.StandardCharsets;
          +import java.util.ArrayList;
          +import java.util.Collections;
          +import java.util.HashSet;
          +import java.util.List;
          +import java.util.Set;
          +import java.util.concurrent.atomic.AtomicBoolean;
          +import java.util.concurrent.atomic.AtomicReference;
          +
          +import org.apache.commons.io.IOUtils;
          +import org.apache.nifi.annotation.behavior.EventDriven;
          +import org.apache.nifi.annotation.behavior.InputRequirement;
          +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
          +import org.apache.nifi.annotation.behavior.SideEffectFree;
          +import org.apache.nifi.annotation.behavior.SupportsBatching;
          +import org.apache.nifi.annotation.documentation.CapabilityDescription;
          +import org.apache.nifi.annotation.documentation.Tags;
          +import org.apache.nifi.annotation.lifecycle.OnScheduled;
          +import org.apache.nifi.components.PropertyDescriptor;
          +import org.apache.nifi.flowfile.FlowFile;
          +import org.apache.nifi.logging.ComponentLog;
          +import org.apache.nifi.processor.AbstractProcessor;
          +import org.apache.nifi.processor.ProcessContext;
          +import org.apache.nifi.processor.ProcessSession;
          +import org.apache.nifi.processor.ProcessorInitializationContext;
          +import org.apache.nifi.processor.Relationship;
          +import org.apache.nifi.processor.io.InputStreamCallback;
          +import org.apache.nifi.processor.util.StandardValidators;
          +
          +import org.everit.json.schema.Schema;
          +import org.everit.json.schema.ValidationException;
          +import org.everit.json.schema.loader.SchemaLoader;
          +import org.json.JSONArray;
          +import org.json.JSONObject;
          +import org.json.JSONTokener;
          +
          +
          +@EventDriven
          +@SideEffectFree
          +@SupportsBatching
          +@InputRequirement(Requirement.INPUT_REQUIRED)
          +@Tags(

          {"json", "schema", "validation"}

          )
          +@CapabilityDescription("Validates the contents of FlowFiles against a user-specified JSON Schema file")
          +public class ValidateJson extends AbstractProcessor {
          +
          + public static final PropertyDescriptor SCHEMA_FILE = new PropertyDescriptor.Builder()
          + .name("Schema File")
          — End diff –

          sure thing, wasn't aware of that

          Show
          githubbot ASF GitHub Bot added a comment - Github user bartoszjkwozniak commented on a diff in the pull request: https://github.com/apache/nifi/pull/1037#discussion_r79778904 — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateJson.java — @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.commons.io.IOUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; + +import org.everit.json.schema.Schema; +import org.everit.json.schema.ValidationException; +import org.everit.json.schema.loader.SchemaLoader; +import org.json.JSONArray; +import org.json.JSONObject; +import org.json.JSONTokener; + + +@EventDriven +@SideEffectFree +@SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags( {"json", "schema", "validation"} ) +@CapabilityDescription("Validates the contents of FlowFiles against a user-specified JSON Schema file") +public class ValidateJson extends AbstractProcessor { + + public static final PropertyDescriptor SCHEMA_FILE = new PropertyDescriptor.Builder() + .name("Schema File") — End diff – sure thing, wasn't aware of that
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user bartoszjkwozniak commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1037#discussion_r79779153

          — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateJson.java —
          @@ -0,0 +1,159 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.processors.standard;
          +
          +import java.io.File;
          +import java.io.FileInputStream;
          +import java.io.IOException;
          +import java.io.InputStream;
          +import java.nio.charset.StandardCharsets;
          +import java.util.ArrayList;
          +import java.util.Collections;
          +import java.util.HashSet;
          +import java.util.List;
          +import java.util.Set;
          +import java.util.concurrent.atomic.AtomicBoolean;
          +import java.util.concurrent.atomic.AtomicReference;
          +
          +import org.apache.commons.io.IOUtils;
          +import org.apache.nifi.annotation.behavior.EventDriven;
          +import org.apache.nifi.annotation.behavior.InputRequirement;
          +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
          +import org.apache.nifi.annotation.behavior.SideEffectFree;
          +import org.apache.nifi.annotation.behavior.SupportsBatching;
          +import org.apache.nifi.annotation.documentation.CapabilityDescription;
          +import org.apache.nifi.annotation.documentation.Tags;
          +import org.apache.nifi.annotation.lifecycle.OnScheduled;
          +import org.apache.nifi.components.PropertyDescriptor;
          +import org.apache.nifi.flowfile.FlowFile;
          +import org.apache.nifi.logging.ComponentLog;
          +import org.apache.nifi.processor.AbstractProcessor;
          +import org.apache.nifi.processor.ProcessContext;
          +import org.apache.nifi.processor.ProcessSession;
          +import org.apache.nifi.processor.ProcessorInitializationContext;
          +import org.apache.nifi.processor.Relationship;
          +import org.apache.nifi.processor.io.InputStreamCallback;
          +import org.apache.nifi.processor.util.StandardValidators;
          +
          +import org.everit.json.schema.Schema;
          +import org.everit.json.schema.ValidationException;
          +import org.everit.json.schema.loader.SchemaLoader;
          +import org.json.JSONArray;
          +import org.json.JSONObject;
          +import org.json.JSONTokener;
          +
          +
          +@EventDriven
          +@SideEffectFree
          +@SupportsBatching
          +@InputRequirement(Requirement.INPUT_REQUIRED)
          +@Tags(

          {"json", "schema", "validation"}

          )
          +@CapabilityDescription("Validates the contents of FlowFiles against a user-specified JSON Schema file")
          +public class ValidateJson extends AbstractProcessor {
          +
          + public static final PropertyDescriptor SCHEMA_FILE = new PropertyDescriptor.Builder()
          + .name("Schema File")
          + .description("The path to the Schema file that is to be used for validation")
          + .required(true)
          + .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
          + .build();
          +
          + public static final Relationship REL_VALID = new Relationship.Builder()
          + .name("valid")
          + .description("FlowFiles that are successfully validated against the schema are routed to this relationship")
          + .build();
          + public static final Relationship REL_INVALID = new Relationship.Builder()
          + .name("invalid")
          + .description("FlowFiles that are not valid according to the specified schema are routed to this relationship")
          + .build();
          +
          + private List<PropertyDescriptor> properties;
          + private Set<Relationship> relationships;
          + private final AtomicReference<Schema> schemaRef = new AtomicReference<>();
          +
          + @Override
          + protected void init(final ProcessorInitializationContext context) {
          + final List<PropertyDescriptor> properties = new ArrayList<>();
          + properties.add(SCHEMA_FILE);
          — End diff –

          Thank you for your detailed suggestion, it helped a lot!

          Show
          githubbot ASF GitHub Bot added a comment - Github user bartoszjkwozniak commented on a diff in the pull request: https://github.com/apache/nifi/pull/1037#discussion_r79779153 — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateJson.java — @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.commons.io.IOUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; + +import org.everit.json.schema.Schema; +import org.everit.json.schema.ValidationException; +import org.everit.json.schema.loader.SchemaLoader; +import org.json.JSONArray; +import org.json.JSONObject; +import org.json.JSONTokener; + + +@EventDriven +@SideEffectFree +@SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags( {"json", "schema", "validation"} ) +@CapabilityDescription("Validates the contents of FlowFiles against a user-specified JSON Schema file") +public class ValidateJson extends AbstractProcessor { + + public static final PropertyDescriptor SCHEMA_FILE = new PropertyDescriptor.Builder() + .name("Schema File") + .description("The path to the Schema file that is to be used for validation") + .required(true) + .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) + .build(); + + public static final Relationship REL_VALID = new Relationship.Builder() + .name("valid") + .description("FlowFiles that are successfully validated against the schema are routed to this relationship") + .build(); + public static final Relationship REL_INVALID = new Relationship.Builder() + .name("invalid") + .description("FlowFiles that are not valid according to the specified schema are routed to this relationship") + .build(); + + private List<PropertyDescriptor> properties; + private Set<Relationship> relationships; + private final AtomicReference<Schema> schemaRef = new AtomicReference<>(); + + @Override + protected void init(final ProcessorInitializationContext context) { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(SCHEMA_FILE); — End diff – Thank you for your detailed suggestion, it helped a lot!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user bartoszjkwozniak commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1037#discussion_r79779583

          — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateJson.java —
          @@ -0,0 +1,159 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.processors.standard;
          +
          +import java.io.File;
          +import java.io.FileInputStream;
          +import java.io.IOException;
          +import java.io.InputStream;
          +import java.nio.charset.StandardCharsets;
          +import java.util.ArrayList;
          +import java.util.Collections;
          +import java.util.HashSet;
          +import java.util.List;
          +import java.util.Set;
          +import java.util.concurrent.atomic.AtomicBoolean;
          +import java.util.concurrent.atomic.AtomicReference;
          +
          +import org.apache.commons.io.IOUtils;
          +import org.apache.nifi.annotation.behavior.EventDriven;
          +import org.apache.nifi.annotation.behavior.InputRequirement;
          +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
          +import org.apache.nifi.annotation.behavior.SideEffectFree;
          +import org.apache.nifi.annotation.behavior.SupportsBatching;
          +import org.apache.nifi.annotation.documentation.CapabilityDescription;
          +import org.apache.nifi.annotation.documentation.Tags;
          +import org.apache.nifi.annotation.lifecycle.OnScheduled;
          +import org.apache.nifi.components.PropertyDescriptor;
          +import org.apache.nifi.flowfile.FlowFile;
          +import org.apache.nifi.logging.ComponentLog;
          +import org.apache.nifi.processor.AbstractProcessor;
          +import org.apache.nifi.processor.ProcessContext;
          +import org.apache.nifi.processor.ProcessSession;
          +import org.apache.nifi.processor.ProcessorInitializationContext;
          +import org.apache.nifi.processor.Relationship;
          +import org.apache.nifi.processor.io.InputStreamCallback;
          +import org.apache.nifi.processor.util.StandardValidators;
          +
          +import org.everit.json.schema.Schema;
          +import org.everit.json.schema.ValidationException;
          +import org.everit.json.schema.loader.SchemaLoader;
          +import org.json.JSONArray;
          +import org.json.JSONObject;
          +import org.json.JSONTokener;
          +
          +
          +@EventDriven
          +@SideEffectFree
          +@SupportsBatching
          +@InputRequirement(Requirement.INPUT_REQUIRED)
          +@Tags(

          {"json", "schema", "validation"}

          )
          +@CapabilityDescription("Validates the contents of FlowFiles against a user-specified JSON Schema file")
          +public class ValidateJson extends AbstractProcessor {
          +
          + public static final PropertyDescriptor SCHEMA_FILE = new PropertyDescriptor.Builder()
          + .name("Schema File")
          + .description("The path to the Schema file that is to be used for validation")
          + .required(true)
          + .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
          + .build();
          +
          + public static final Relationship REL_VALID = new Relationship.Builder()
          + .name("valid")
          + .description("FlowFiles that are successfully validated against the schema are routed to this relationship")
          + .build();
          + public static final Relationship REL_INVALID = new Relationship.Builder()
          + .name("invalid")
          + .description("FlowFiles that are not valid according to the specified schema are routed to this relationship")
          + .build();
          +
          + private List<PropertyDescriptor> properties;
          + private Set<Relationship> relationships;
          + private final AtomicReference<Schema> schemaRef = new AtomicReference<>();
          +
          + @Override
          + protected void init(final ProcessorInitializationContext context)

          { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(SCHEMA_FILE); + this.properties = Collections.unmodifiableList(properties); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_VALID); + relationships.add(REL_INVALID); + this.relationships = Collections.unmodifiableSet(relationships); + }

          +
          + @Override
          + public Set<Relationship> getRelationships()

          { + return relationships; + }

          +
          + @Override
          + protected List<PropertyDescriptor> getSupportedPropertyDescriptors()

          { + return properties; + }

          +
          + @OnScheduled
          + public void parseSchema(final ProcessContext context) throws IOException {
          + try(FileInputStream inputStream = new FileInputStream(new File(context.getProperty(SCHEMA_FILE).getValue())))

          { + JSONTokener jsonTokener = new JSONTokener(inputStream); + JSONObject jsonObject = new JSONObject(jsonTokener); + Schema schema = SchemaLoader.load(jsonObject); + this.schemaRef.set(schema); + }

          + }
          +
          + @Override
          + public void onTrigger(final ProcessContext context, final ProcessSession session) {
          + final List<FlowFile> flowFiles = session.get(50);
          + if (flowFiles.isEmpty())

          { + return; + }

          + final Schema schema = schemaRef.get();
          + final ComponentLog logger = getLogger();
          +
          + for (final FlowFile flowFile : flowFiles) {
          + final AtomicBoolean valid = new AtomicBoolean(true);
          + session.read(flowFile, new InputStreamCallback() {
          + @Override
          + public void process(final InputStream in) throws IOException {
          — End diff –

          did routing to invalid path.

          Show
          githubbot ASF GitHub Bot added a comment - Github user bartoszjkwozniak commented on a diff in the pull request: https://github.com/apache/nifi/pull/1037#discussion_r79779583 — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateJson.java — @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.commons.io.IOUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; + +import org.everit.json.schema.Schema; +import org.everit.json.schema.ValidationException; +import org.everit.json.schema.loader.SchemaLoader; +import org.json.JSONArray; +import org.json.JSONObject; +import org.json.JSONTokener; + + +@EventDriven +@SideEffectFree +@SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags( {"json", "schema", "validation"} ) +@CapabilityDescription("Validates the contents of FlowFiles against a user-specified JSON Schema file") +public class ValidateJson extends AbstractProcessor { + + public static final PropertyDescriptor SCHEMA_FILE = new PropertyDescriptor.Builder() + .name("Schema File") + .description("The path to the Schema file that is to be used for validation") + .required(true) + .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) + .build(); + + public static final Relationship REL_VALID = new Relationship.Builder() + .name("valid") + .description("FlowFiles that are successfully validated against the schema are routed to this relationship") + .build(); + public static final Relationship REL_INVALID = new Relationship.Builder() + .name("invalid") + .description("FlowFiles that are not valid according to the specified schema are routed to this relationship") + .build(); + + private List<PropertyDescriptor> properties; + private Set<Relationship> relationships; + private final AtomicReference<Schema> schemaRef = new AtomicReference<>(); + + @Override + protected void init(final ProcessorInitializationContext context) { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(SCHEMA_FILE); + this.properties = Collections.unmodifiableList(properties); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_VALID); + relationships.add(REL_INVALID); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @OnScheduled + public void parseSchema(final ProcessContext context) throws IOException { + try(FileInputStream inputStream = new FileInputStream(new File(context.getProperty(SCHEMA_FILE).getValue()))) { + JSONTokener jsonTokener = new JSONTokener(inputStream); + JSONObject jsonObject = new JSONObject(jsonTokener); + Schema schema = SchemaLoader.load(jsonObject); + this.schemaRef.set(schema); + } + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + final List<FlowFile> flowFiles = session.get(50); + if (flowFiles.isEmpty()) { + return; + } + final Schema schema = schemaRef.get(); + final ComponentLog logger = getLogger(); + + for (final FlowFile flowFile : flowFiles) { + final AtomicBoolean valid = new AtomicBoolean(true); + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { — End diff – did routing to invalid path.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user bartoszjkwozniak commented on the issue:

          https://github.com/apache/nifi/pull/1037

          Thank you for your insight. Pushed new commit on branch.

          Show
          githubbot ASF GitHub Bot added a comment - Github user bartoszjkwozniak commented on the issue: https://github.com/apache/nifi/pull/1037 Thank you for your insight. Pushed new commit on branch.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user pvillard31 commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1037#discussion_r79830777

          — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml —
          @@ -249,6 +249,11 @@ language governing permissions and limitations under the License. -->
          <artifactId>super-csv</artifactId>
          <version>2.4.0</version>
          </dependency>
          + <dependency>
          + <groupId>org.everit.json</groupId>
          + <artifactId>org.everit.json.schema</artifactId>
          + <version>1.4.0</version>
          + </dependency>
          — End diff –

          In the pom file, you also have to add the json files you added for testing purpose in order to exclude those files during the rat check (licensing aspects). You can check that everything is fine by running a mvn clean install with the profile contrib-check:
          ````
          mvn clean install -Pcontrib-check
          ````
          Side note: from where are coming the json files your are using for tests? (just to be sure there is no issue)

          Show
          githubbot ASF GitHub Bot added a comment - Github user pvillard31 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1037#discussion_r79830777 — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml — @@ -249,6 +249,11 @@ language governing permissions and limitations under the License. --> <artifactId>super-csv</artifactId> <version>2.4.0</version> </dependency> + <dependency> + <groupId>org.everit.json</groupId> + <artifactId>org.everit.json.schema</artifactId> + <version>1.4.0</version> + </dependency> — End diff – In the pom file, you also have to add the json files you added for testing purpose in order to exclude those files during the rat check (licensing aspects). You can check that everything is fine by running a mvn clean install with the profile contrib-check: ```` mvn clean install -Pcontrib-check ```` Side note: from where are coming the json files your are using for tests? (just to be sure there is no issue)
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user pvillard31 commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1037#discussion_r79833336

          — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateJson.java —
          @@ -0,0 +1,79 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.processors.standard;
          +
          +import java.io.IOException;
          +import java.nio.file.Paths;
          +
          +import org.apache.commons.io.IOUtils;
          +
          +import org.apache.nifi.util.TestRunner;
          +import org.apache.nifi.util.TestRunners;
          +
          +import org.junit.Test;
          +import org.xml.sax.SAXException;
          +
          +public class TestValidateJson {
          — End diff –

          Could you add one or two unit tests for the invalid case?

          Show
          githubbot ASF GitHub Bot added a comment - Github user pvillard31 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1037#discussion_r79833336 — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateJson.java — @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.IOException; +import java.nio.file.Paths; + +import org.apache.commons.io.IOUtils; + +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; + +import org.junit.Test; +import org.xml.sax.SAXException; + +public class TestValidateJson { — End diff – Could you add one or two unit tests for the invalid case?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user pvillard31 commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1037#discussion_r79831139

          — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateJson.java —
          @@ -0,0 +1,202 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.processors.standard;
          +
          +import java.io.File;
          +import java.io.FileInputStream;
          +import java.io.IOException;
          +import java.io.InputStream;
          +import java.nio.charset.StandardCharsets;
          +import java.util.ArrayList;
          +import java.util.Collection;
          +import java.util.Collections;
          +import java.util.HashSet;
          +import java.util.List;
          +import java.util.Set;
          +import java.util.Map;
          +import java.util.concurrent.atomic.AtomicBoolean;
          +import java.util.concurrent.atomic.AtomicReference;
          +
          +import org.apache.commons.io.IOUtils;
          +import org.apache.nifi.annotation.behavior.EventDriven;
          +import org.apache.nifi.annotation.behavior.InputRequirement;
          +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
          +import org.apache.nifi.annotation.behavior.SideEffectFree;
          +import org.apache.nifi.annotation.behavior.SupportsBatching;
          +import org.apache.nifi.annotation.documentation.CapabilityDescription;
          +import org.apache.nifi.annotation.documentation.Tags;
          +import org.apache.nifi.annotation.lifecycle.OnScheduled;
          +import org.apache.nifi.components.PropertyDescriptor;
          +import org.apache.nifi.components.ValidationContext;
          +import org.apache.nifi.components.ValidationResult;
          +import org.apache.nifi.components.Validator;
          +import org.apache.nifi.flowfile.FlowFile;
          +import org.apache.nifi.logging.ComponentLog;
          +import org.apache.nifi.processor.AbstractProcessor;
          +import org.apache.nifi.processor.ProcessContext;
          +import org.apache.nifi.processor.ProcessSession;
          +import org.apache.nifi.processor.ProcessorInitializationContext;
          +import org.apache.nifi.processor.Relationship;
          +import org.apache.nifi.processor.io.InputStreamCallback;
          +import org.apache.nifi.processor.util.StandardValidators;
          +import org.apache.nifi.util.StringUtils;
          +
          +import org.everit.json.schema.Schema;
          +import org.everit.json.schema.ValidationException;
          +import org.everit.json.schema.loader.SchemaLoader;
          +import org.json.JSONArray;
          +import org.json.JSONObject;
          +import org.json.JSONTokener;
          +
          +
          +@EventDriven
          +@SideEffectFree
          +@SupportsBatching
          +@InputRequirement(Requirement.INPUT_REQUIRED)
          +@Tags(

          {"json", "schema", "validation"}

          )
          +@CapabilityDescription("Validates the contents of FlowFiles against a user-specified JSON Schema file")
          +public class ValidateJson extends AbstractProcessor {
          +
          + public static final PropertyDescriptor SCHEMA_FILE = new PropertyDescriptor.Builder()
          + .name("validate-json-schema-file")
          + .displayName("Schema File")
          + .description("The path to the Schema file that is to be used for validation. Only one of Schema File or Schema Body may be used")
          + .required(false)
          + .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
          + .build();
          +
          + public static final PropertyDescriptor SCHEMA_BODY = new PropertyDescriptor.Builder()
          + .name("validate-json-schema-body")
          + .displayName("Schema Body")
          + .required(false)
          + .description("Json Schema Body that is to be used for validation. Only one of Schema File or Schema Body may be used")
          + .expressionLanguageSupported(false)
          + .addValidator(Validator.VALID)
          + .build();
          +
          + public static final Relationship REL_VALID = new Relationship.Builder()
          + .name("valid")
          + .description("FlowFiles that are successfully validated against the schema are routed to this relationship")
          + .build();
          + public static final Relationship REL_INVALID = new Relationship.Builder()
          + .name("invalid")
          + .description("FlowFiles that are not valid according to the specified schema are routed to this relationship")
          + .build();
          +
          + private List<PropertyDescriptor> properties;
          + private Set<Relationship> relationships;
          + private final AtomicReference<Schema> schemaRef = new AtomicReference<>();
          +
          + /**
          + * Custom validation for ensuring exactly one of Script File or Script Body is populated
          + *
          + * @param validationContext provides a mechanism for obtaining externally
          + * managed values, such as property values and supplies convenience methods
          + * for operating on those values
          + * @return A collection of validation results
          + */
          + @Override
          + protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
          + Set<ValidationResult> results = new HashSet<>();
          +
          + // Verify that exactly one of "script file" or "script body" is set
          + Map<PropertyDescriptor, String> propertyMap = validationContext.getProperties();
          + if (StringUtils.isEmpty(propertyMap.get(SCHEMA_FILE)) == StringUtils.isEmpty(propertyMap.get(SCHEMA_BODY)))

          { + results.add(new ValidationResult.Builder().valid(false).explanation( + "Exactly one of Schema File or Schema Body must be set").build()); + }

          +
          + return results;
          + }
          +
          + @Override
          + protected void init(final ProcessorInitializationContext context)

          { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(SCHEMA_FILE); + properties.add(SCHEMA_BODY); + this.properties = Collections.unmodifiableList(properties); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_VALID); + relationships.add(REL_INVALID); + this.relationships = Collections.unmodifiableSet(relationships); + }

          +
          + @Override
          + public Set<Relationship> getRelationships()

          { + return relationships; + }

          +
          + @Override
          + protected List<PropertyDescriptor> getSupportedPropertyDescriptors()

          { + return properties; + }

          +
          + @OnScheduled
          + public void parseSchema(final ProcessContext context) throws IOException {
          + JSONObject jsonObjectSchema;
          + if(context.getProperty(SCHEMA_FILE).isSet()){
          + try(FileInputStream inputStream = new FileInputStream(new File(context.getProperty(SCHEMA_FILE).getValue())))

          { + JSONTokener jsonTokener = new JSONTokener(inputStream); + jsonObjectSchema = new JSONObject(jsonTokener); + }

          + } else

          { + String rawSchema = context.getProperty(SCHEMA_BODY).getValue(); + jsonObjectSchema = new JSONObject(rawSchema); + }

          + Schema schema = SchemaLoader.load(jsonObjectSchema);
          + this.schemaRef.set(schema);
          + }
          +
          + @Override
          + public void onTrigger(final ProcessContext context, final ProcessSession session) {
          + FlowFile flowFile = session.get();
          + if (flowFile == null)

          { + return; + }

          + final Schema schema = schemaRef.get();
          + final ComponentLog logger = getLogger();
          +
          + final AtomicBoolean valid = new AtomicBoolean(true);
          + session.read(flowFile, new InputStreamCallback() {
          + @Override
          + public void process(final InputStream in) throws IOException {
          — End diff –

          You can remove the IOException now

          Show
          githubbot ASF GitHub Bot added a comment - Github user pvillard31 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1037#discussion_r79831139 — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateJson.java — @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.commons.io.IOUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.StringUtils; + +import org.everit.json.schema.Schema; +import org.everit.json.schema.ValidationException; +import org.everit.json.schema.loader.SchemaLoader; +import org.json.JSONArray; +import org.json.JSONObject; +import org.json.JSONTokener; + + +@EventDriven +@SideEffectFree +@SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags( {"json", "schema", "validation"} ) +@CapabilityDescription("Validates the contents of FlowFiles against a user-specified JSON Schema file") +public class ValidateJson extends AbstractProcessor { + + public static final PropertyDescriptor SCHEMA_FILE = new PropertyDescriptor.Builder() + .name("validate-json-schema-file") + .displayName("Schema File") + .description("The path to the Schema file that is to be used for validation. Only one of Schema File or Schema Body may be used") + .required(false) + .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) + .build(); + + public static final PropertyDescriptor SCHEMA_BODY = new PropertyDescriptor.Builder() + .name("validate-json-schema-body") + .displayName("Schema Body") + .required(false) + .description("Json Schema Body that is to be used for validation. Only one of Schema File or Schema Body may be used") + .expressionLanguageSupported(false) + .addValidator(Validator.VALID) + .build(); + + public static final Relationship REL_VALID = new Relationship.Builder() + .name("valid") + .description("FlowFiles that are successfully validated against the schema are routed to this relationship") + .build(); + public static final Relationship REL_INVALID = new Relationship.Builder() + .name("invalid") + .description("FlowFiles that are not valid according to the specified schema are routed to this relationship") + .build(); + + private List<PropertyDescriptor> properties; + private Set<Relationship> relationships; + private final AtomicReference<Schema> schemaRef = new AtomicReference<>(); + + /** + * Custom validation for ensuring exactly one of Script File or Script Body is populated + * + * @param validationContext provides a mechanism for obtaining externally + * managed values, such as property values and supplies convenience methods + * for operating on those values + * @return A collection of validation results + */ + @Override + protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { + Set<ValidationResult> results = new HashSet<>(); + + // Verify that exactly one of "script file" or "script body" is set + Map<PropertyDescriptor, String> propertyMap = validationContext.getProperties(); + if (StringUtils.isEmpty(propertyMap.get(SCHEMA_FILE)) == StringUtils.isEmpty(propertyMap.get(SCHEMA_BODY))) { + results.add(new ValidationResult.Builder().valid(false).explanation( + "Exactly one of Schema File or Schema Body must be set").build()); + } + + return results; + } + + @Override + protected void init(final ProcessorInitializationContext context) { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(SCHEMA_FILE); + properties.add(SCHEMA_BODY); + this.properties = Collections.unmodifiableList(properties); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_VALID); + relationships.add(REL_INVALID); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @OnScheduled + public void parseSchema(final ProcessContext context) throws IOException { + JSONObject jsonObjectSchema; + if(context.getProperty(SCHEMA_FILE).isSet()){ + try(FileInputStream inputStream = new FileInputStream(new File(context.getProperty(SCHEMA_FILE).getValue()))) { + JSONTokener jsonTokener = new JSONTokener(inputStream); + jsonObjectSchema = new JSONObject(jsonTokener); + } + } else { + String rawSchema = context.getProperty(SCHEMA_BODY).getValue(); + jsonObjectSchema = new JSONObject(rawSchema); + } + Schema schema = SchemaLoader.load(jsonObjectSchema); + this.schemaRef.set(schema); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + final Schema schema = schemaRef.get(); + final ComponentLog logger = getLogger(); + + final AtomicBoolean valid = new AtomicBoolean(true); + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { — End diff – You can remove the IOException now
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user pvillard31 commented on the issue:

          https://github.com/apache/nifi/pull/1037

          For other reviewers, a template to test the processor:
          https://gist.github.com/pvillard31/0d8d0d3122ecdd179cc39fbd83d4b9c4

          Show
          githubbot ASF GitHub Bot added a comment - Github user pvillard31 commented on the issue: https://github.com/apache/nifi/pull/1037 For other reviewers, a template to test the processor: https://gist.github.com/pvillard31/0d8d0d3122ecdd179cc39fbd83d4b9c4
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user bartoszjkwozniak commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1037#discussion_r79849108

          — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml —
          @@ -249,6 +249,11 @@ language governing permissions and limitations under the License. -->
          <artifactId>super-csv</artifactId>
          <version>2.4.0</version>
          </dependency>
          + <dependency>
          + <groupId>org.everit.json</groupId>
          + <artifactId>org.everit.json.schema</artifactId>
          + <version>1.4.0</version>
          + </dependency>
          — End diff –

          ahh, I see now. Corrected.

          Show
          githubbot ASF GitHub Bot added a comment - Github user bartoszjkwozniak commented on a diff in the pull request: https://github.com/apache/nifi/pull/1037#discussion_r79849108 — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml — @@ -249,6 +249,11 @@ language governing permissions and limitations under the License. --> <artifactId>super-csv</artifactId> <version>2.4.0</version> </dependency> + <dependency> + <groupId>org.everit.json</groupId> + <artifactId>org.everit.json.schema</artifactId> + <version>1.4.0</version> + </dependency> — End diff – ahh, I see now. Corrected.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user bartoszjkwozniak commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1037#discussion_r79849248

          — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateJson.java —
          @@ -0,0 +1,79 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.processors.standard;
          +
          +import java.io.IOException;
          +import java.nio.file.Paths;
          +
          +import org.apache.commons.io.IOUtils;
          +
          +import org.apache.nifi.util.TestRunner;
          +import org.apache.nifi.util.TestRunners;
          +
          +import org.junit.Test;
          +import org.xml.sax.SAXException;
          +
          +public class TestValidateJson {
          — End diff –

          Sure thing, added

          Show
          githubbot ASF GitHub Bot added a comment - Github user bartoszjkwozniak commented on a diff in the pull request: https://github.com/apache/nifi/pull/1037#discussion_r79849248 — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateJson.java — @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.IOException; +import java.nio.file.Paths; + +import org.apache.commons.io.IOUtils; + +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; + +import org.junit.Test; +import org.xml.sax.SAXException; + +public class TestValidateJson { — End diff – Sure thing, added
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user bartoszjkwozniak commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1037#discussion_r79849231

          — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateJson.java —
          @@ -0,0 +1,202 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.processors.standard;
          +
          +import java.io.File;
          +import java.io.FileInputStream;
          +import java.io.IOException;
          +import java.io.InputStream;
          +import java.nio.charset.StandardCharsets;
          +import java.util.ArrayList;
          +import java.util.Collection;
          +import java.util.Collections;
          +import java.util.HashSet;
          +import java.util.List;
          +import java.util.Set;
          +import java.util.Map;
          +import java.util.concurrent.atomic.AtomicBoolean;
          +import java.util.concurrent.atomic.AtomicReference;
          +
          +import org.apache.commons.io.IOUtils;
          +import org.apache.nifi.annotation.behavior.EventDriven;
          +import org.apache.nifi.annotation.behavior.InputRequirement;
          +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
          +import org.apache.nifi.annotation.behavior.SideEffectFree;
          +import org.apache.nifi.annotation.behavior.SupportsBatching;
          +import org.apache.nifi.annotation.documentation.CapabilityDescription;
          +import org.apache.nifi.annotation.documentation.Tags;
          +import org.apache.nifi.annotation.lifecycle.OnScheduled;
          +import org.apache.nifi.components.PropertyDescriptor;
          +import org.apache.nifi.components.ValidationContext;
          +import org.apache.nifi.components.ValidationResult;
          +import org.apache.nifi.components.Validator;
          +import org.apache.nifi.flowfile.FlowFile;
          +import org.apache.nifi.logging.ComponentLog;
          +import org.apache.nifi.processor.AbstractProcessor;
          +import org.apache.nifi.processor.ProcessContext;
          +import org.apache.nifi.processor.ProcessSession;
          +import org.apache.nifi.processor.ProcessorInitializationContext;
          +import org.apache.nifi.processor.Relationship;
          +import org.apache.nifi.processor.io.InputStreamCallback;
          +import org.apache.nifi.processor.util.StandardValidators;
          +import org.apache.nifi.util.StringUtils;
          +
          +import org.everit.json.schema.Schema;
          +import org.everit.json.schema.ValidationException;
          +import org.everit.json.schema.loader.SchemaLoader;
          +import org.json.JSONArray;
          +import org.json.JSONObject;
          +import org.json.JSONTokener;
          +
          +
          +@EventDriven
          +@SideEffectFree
          +@SupportsBatching
          +@InputRequirement(Requirement.INPUT_REQUIRED)
          +@Tags(

          {"json", "schema", "validation"}

          )
          +@CapabilityDescription("Validates the contents of FlowFiles against a user-specified JSON Schema file")
          +public class ValidateJson extends AbstractProcessor {
          +
          + public static final PropertyDescriptor SCHEMA_FILE = new PropertyDescriptor.Builder()
          + .name("validate-json-schema-file")
          + .displayName("Schema File")
          + .description("The path to the Schema file that is to be used for validation. Only one of Schema File or Schema Body may be used")
          + .required(false)
          + .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
          + .build();
          +
          + public static final PropertyDescriptor SCHEMA_BODY = new PropertyDescriptor.Builder()
          + .name("validate-json-schema-body")
          + .displayName("Schema Body")
          + .required(false)
          + .description("Json Schema Body that is to be used for validation. Only one of Schema File or Schema Body may be used")
          + .expressionLanguageSupported(false)
          + .addValidator(Validator.VALID)
          + .build();
          +
          + public static final Relationship REL_VALID = new Relationship.Builder()
          + .name("valid")
          + .description("FlowFiles that are successfully validated against the schema are routed to this relationship")
          + .build();
          + public static final Relationship REL_INVALID = new Relationship.Builder()
          + .name("invalid")
          + .description("FlowFiles that are not valid according to the specified schema are routed to this relationship")
          + .build();
          +
          + private List<PropertyDescriptor> properties;
          + private Set<Relationship> relationships;
          + private final AtomicReference<Schema> schemaRef = new AtomicReference<>();
          +
          + /**
          + * Custom validation for ensuring exactly one of Script File or Script Body is populated
          + *
          + * @param validationContext provides a mechanism for obtaining externally
          + * managed values, such as property values and supplies convenience methods
          + * for operating on those values
          + * @return A collection of validation results
          + */
          + @Override
          + protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
          + Set<ValidationResult> results = new HashSet<>();
          +
          + // Verify that exactly one of "script file" or "script body" is set
          + Map<PropertyDescriptor, String> propertyMap = validationContext.getProperties();
          + if (StringUtils.isEmpty(propertyMap.get(SCHEMA_FILE)) == StringUtils.isEmpty(propertyMap.get(SCHEMA_BODY)))

          { + results.add(new ValidationResult.Builder().valid(false).explanation( + "Exactly one of Schema File or Schema Body must be set").build()); + }

          +
          + return results;
          + }
          +
          + @Override
          + protected void init(final ProcessorInitializationContext context)

          { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(SCHEMA_FILE); + properties.add(SCHEMA_BODY); + this.properties = Collections.unmodifiableList(properties); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_VALID); + relationships.add(REL_INVALID); + this.relationships = Collections.unmodifiableSet(relationships); + }

          +
          + @Override
          + public Set<Relationship> getRelationships()

          { + return relationships; + }

          +
          + @Override
          + protected List<PropertyDescriptor> getSupportedPropertyDescriptors()

          { + return properties; + }

          +
          + @OnScheduled
          + public void parseSchema(final ProcessContext context) throws IOException {
          + JSONObject jsonObjectSchema;
          + if(context.getProperty(SCHEMA_FILE).isSet()){
          + try(FileInputStream inputStream = new FileInputStream(new File(context.getProperty(SCHEMA_FILE).getValue())))

          { + JSONTokener jsonTokener = new JSONTokener(inputStream); + jsonObjectSchema = new JSONObject(jsonTokener); + }

          + } else

          { + String rawSchema = context.getProperty(SCHEMA_BODY).getValue(); + jsonObjectSchema = new JSONObject(rawSchema); + }

          + Schema schema = SchemaLoader.load(jsonObjectSchema);
          + this.schemaRef.set(schema);
          + }
          +
          + @Override
          + public void onTrigger(final ProcessContext context, final ProcessSession session) {
          + FlowFile flowFile = session.get();
          + if (flowFile == null)

          { + return; + }

          + final Schema schema = schemaRef.get();
          + final ComponentLog logger = getLogger();
          +
          + final AtomicBoolean valid = new AtomicBoolean(true);
          + session.read(flowFile, new InputStreamCallback() {
          + @Override
          + public void process(final InputStream in) throws IOException {
          — End diff –

          Right, removed.

          Show
          githubbot ASF GitHub Bot added a comment - Github user bartoszjkwozniak commented on a diff in the pull request: https://github.com/apache/nifi/pull/1037#discussion_r79849231 — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateJson.java — @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.commons.io.IOUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.StringUtils; + +import org.everit.json.schema.Schema; +import org.everit.json.schema.ValidationException; +import org.everit.json.schema.loader.SchemaLoader; +import org.json.JSONArray; +import org.json.JSONObject; +import org.json.JSONTokener; + + +@EventDriven +@SideEffectFree +@SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags( {"json", "schema", "validation"} ) +@CapabilityDescription("Validates the contents of FlowFiles against a user-specified JSON Schema file") +public class ValidateJson extends AbstractProcessor { + + public static final PropertyDescriptor SCHEMA_FILE = new PropertyDescriptor.Builder() + .name("validate-json-schema-file") + .displayName("Schema File") + .description("The path to the Schema file that is to be used for validation. Only one of Schema File or Schema Body may be used") + .required(false) + .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) + .build(); + + public static final PropertyDescriptor SCHEMA_BODY = new PropertyDescriptor.Builder() + .name("validate-json-schema-body") + .displayName("Schema Body") + .required(false) + .description("Json Schema Body that is to be used for validation. Only one of Schema File or Schema Body may be used") + .expressionLanguageSupported(false) + .addValidator(Validator.VALID) + .build(); + + public static final Relationship REL_VALID = new Relationship.Builder() + .name("valid") + .description("FlowFiles that are successfully validated against the schema are routed to this relationship") + .build(); + public static final Relationship REL_INVALID = new Relationship.Builder() + .name("invalid") + .description("FlowFiles that are not valid according to the specified schema are routed to this relationship") + .build(); + + private List<PropertyDescriptor> properties; + private Set<Relationship> relationships; + private final AtomicReference<Schema> schemaRef = new AtomicReference<>(); + + /** + * Custom validation for ensuring exactly one of Script File or Script Body is populated + * + * @param validationContext provides a mechanism for obtaining externally + * managed values, such as property values and supplies convenience methods + * for operating on those values + * @return A collection of validation results + */ + @Override + protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { + Set<ValidationResult> results = new HashSet<>(); + + // Verify that exactly one of "script file" or "script body" is set + Map<PropertyDescriptor, String> propertyMap = validationContext.getProperties(); + if (StringUtils.isEmpty(propertyMap.get(SCHEMA_FILE)) == StringUtils.isEmpty(propertyMap.get(SCHEMA_BODY))) { + results.add(new ValidationResult.Builder().valid(false).explanation( + "Exactly one of Schema File or Schema Body must be set").build()); + } + + return results; + } + + @Override + protected void init(final ProcessorInitializationContext context) { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(SCHEMA_FILE); + properties.add(SCHEMA_BODY); + this.properties = Collections.unmodifiableList(properties); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_VALID); + relationships.add(REL_INVALID); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @OnScheduled + public void parseSchema(final ProcessContext context) throws IOException { + JSONObject jsonObjectSchema; + if(context.getProperty(SCHEMA_FILE).isSet()){ + try(FileInputStream inputStream = new FileInputStream(new File(context.getProperty(SCHEMA_FILE).getValue()))) { + JSONTokener jsonTokener = new JSONTokener(inputStream); + jsonObjectSchema = new JSONObject(jsonTokener); + } + } else { + String rawSchema = context.getProperty(SCHEMA_BODY).getValue(); + jsonObjectSchema = new JSONObject(rawSchema); + } + Schema schema = SchemaLoader.load(jsonObjectSchema); + this.schemaRef.set(schema); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + final Schema schema = schemaRef.get(); + final ComponentLog logger = getLogger(); + + final AtomicBoolean valid = new AtomicBoolean(true); + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { — End diff – Right, removed.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user pvillard31 commented on the issue:

          https://github.com/apache/nifi/pull/1037

          LGTM, full build with contrib-check OK.
          @mattyb149 do you want to have another look?

          Show
          githubbot ASF GitHub Bot added a comment - Github user pvillard31 commented on the issue: https://github.com/apache/nifi/pull/1037 LGTM, full build with contrib-check OK. @mattyb149 do you want to have another look?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mattyb149 commented on the issue:

          https://github.com/apache/nifi/pull/1037

          @pvillard31 nope I'm good if you are, I hadn't realized we had started reviewing at the same time

          Show
          githubbot ASF GitHub Bot added a comment - Github user mattyb149 commented on the issue: https://github.com/apache/nifi/pull/1037 @pvillard31 nope I'm good if you are, I hadn't realized we had started reviewing at the same time
          Hide
          jira-bot ASF subversion and git services added a comment -

          Commit f11682202b57edab585967e033839800d4159f4e in nifi's branch refs/heads/master from Bartosz Woźniak
          [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=f116822 ]

          NIFI-1893 Add processor for validating JSON

          This closes #1037.

          Show
          jira-bot ASF subversion and git services added a comment - Commit f11682202b57edab585967e033839800d4159f4e in nifi's branch refs/heads/master from Bartosz Woźniak [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=f116822 ] NIFI-1893 Add processor for validating JSON This closes #1037.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/nifi/pull/1037

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/nifi/pull/1037
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user pvillard31 commented on the issue:

          https://github.com/apache/nifi/pull/1037

          @bartoszjkwozniak I merged it to master, thanks for your contribution!

          Show
          githubbot ASF GitHub Bot added a comment - Github user pvillard31 commented on the issue: https://github.com/apache/nifi/pull/1037 @bartoszjkwozniak I merged it to master, thanks for your contribution!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user bartoszjkwozniak commented on the issue:

          https://github.com/apache/nifi/pull/1037

          @pvillard31 you're welcome! Thank you too and @mattyb149 for valuable code review.

          Show
          githubbot ASF GitHub Bot added a comment - Github user bartoszjkwozniak commented on the issue: https://github.com/apache/nifi/pull/1037 @pvillard31 you're welcome! Thank you too and @mattyb149 for valuable code review.
          Hide
          joewitt Joseph Witt added a comment -

          Bartosz Woźniak Pierre Villard Matt Burgess Unfortunately we need to revert this commit/processor for now. I've reopened the JIRA. It depends on a library which is now considered category X and also we need to reflect these new libs in our license/notice. Everit for example needs a copyright ref in our notice and it is what pulls in the now catx json dependency.

          Show
          joewitt Joseph Witt added a comment - Bartosz Woźniak Pierre Villard Matt Burgess Unfortunately we need to revert this commit/processor for now. I've reopened the JIRA. It depends on a library which is now considered category X and also we need to reflect these new libs in our license/notice. Everit for example needs a copyright ref in our notice and it is what pulls in the now catx json dependency.
          Hide
          joewitt Joseph Witt added a comment -

          i have reverted this in the NIFI-2991 work. It looks like a cool processor so hopefully it can be reworked to use a different JSON library.

          Show
          joewitt Joseph Witt added a comment - i have reverted this in the NIFI-2991 work. It looks like a cool processor so hopefully it can be reworked to use a different JSON library.
          Hide
          jira-bot ASF subversion and git services added a comment -

          Commit 6696529c99b5b564b076e4012abff3a54755370b in nifi's branch refs/heads/master from Joseph Witt
          [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=6696529 ]

          Revert "NIFI-1893 Add processor for validating JSON"

          This reverts commit f11682202b57edab585967e033839800d4159f4e.

          Signed-off-by: jpercivall <JPercivall@apache.org>

          Show
          jira-bot ASF subversion and git services added a comment - Commit 6696529c99b5b564b076e4012abff3a54755370b in nifi's branch refs/heads/master from Joseph Witt [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=6696529 ] Revert " NIFI-1893 Add processor for validating JSON" This reverts commit f11682202b57edab585967e033839800d4159f4e. Signed-off-by: jpercivall <JPercivall@apache.org>
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user basvank commented on the issue:

          https://github.com/apache/nifi/pull/1037

          @pvillard31 (and @bartoszjkwozniak) is it possible that something went wrong when merging this feature, maybe because the checks failed? It seems the ValidateJson processor did not end up in the master branch or any of the releases (I'm on 1.1.1).

          Show
          githubbot ASF GitHub Bot added a comment - Github user basvank commented on the issue: https://github.com/apache/nifi/pull/1037 @pvillard31 (and @bartoszjkwozniak) is it possible that something went wrong when merging this feature, maybe because the checks failed? It seems the ValidateJson processor did not end up in the master branch or any of the releases (I'm on 1.1.1).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user pvillard31 commented on the issue:

          https://github.com/apache/nifi/pull/1037

          @basvank
          Due to licensing issues:
          https://issues.apache.org/jira/browse/NIFI-2991

          We unfortunately had to remove this processor... we are looking for a new solution using another library. However, in the meantime, you can manually build this processor using this PR, and add the generated NAR to your NiFi instance.

          Show
          githubbot ASF GitHub Bot added a comment - Github user pvillard31 commented on the issue: https://github.com/apache/nifi/pull/1037 @basvank Due to licensing issues: https://issues.apache.org/jira/browse/NIFI-2991 We unfortunately had to remove this processor... we are looking for a new solution using another library. However, in the meantime, you can manually build this processor using this PR, and add the generated NAR to your NiFi instance.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mattyb149 commented on the issue:

          https://github.com/apache/nifi/pull/1037

          I believe it had to be removed because it uses or has a dependency on JSON.org-licensed JARs, which was determined to be Category X for the Apache Software Foundation. It was removed in #1230 under NIFI-2991(https://issues.apache.org/jira/browse/NIFI-2991). I encourage you to change the underlying JSON libraries such that the licensing is ASF-friendly, then we can review and merge the updated PR.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mattyb149 commented on the issue: https://github.com/apache/nifi/pull/1037 I believe it had to be removed because it uses or has a dependency on JSON.org-licensed JARs, which was determined to be Category X for the Apache Software Foundation. It was removed in #1230 under NIFI-2991 ( https://issues.apache.org/jira/browse/NIFI-2991 ). I encourage you to change the underlying JSON libraries such that the licensing is ASF-friendly, then we can review and merge the updated PR.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user basvank commented on the issue:

          https://github.com/apache/nifi/pull/1037

          @pvillard31 @mattyb149 thanks for the information, that didn't become clear from this PR.
          It seems to be possible to implement in an ASF-friendly way using https://github.com/daveclayton/json-schema-validator (ASL 2.0), which depends on https://github.com/FasterXML/jackson-core (Apache 2.0) instead of json.org.
          I will see if I can find the time to do this myself, but if not it can serve as a pointer for others.

          Show
          githubbot ASF GitHub Bot added a comment - Github user basvank commented on the issue: https://github.com/apache/nifi/pull/1037 @pvillard31 @mattyb149 thanks for the information, that didn't become clear from this PR. It seems to be possible to implement in an ASF-friendly way using https://github.com/daveclayton/json-schema-validator (ASL 2.0), which depends on https://github.com/FasterXML/jackson-core (Apache 2.0) instead of json.org. I will see if I can find the time to do this myself, but if not it can serve as a pointer for others.

            People

            • Assignee:
              Unassigned
              Reporter:
              mattyb149 Matt Burgess
            • Votes:
              1 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

              • Created:
                Updated:

                Development