Details

    • Type: New Feature
    • Status: Resolved
    • Priority: Minor
    • Resolution: Resolved
    • Affects Version/s: None
    • Fix Version/s: 1.0.0, 0.7.0
    • Component/s: None
    • Labels:
      None

      Description

      Creating a separate issue to track these as a pull request has been submitted for related issue NIFI-356.

      Also backed by JsonPath, processors should facilitate through specification of user-defined properties:

      • Add - identify path and add key/value pair
        • Handle if the path is an array, this would ignore the name specified and just add the value to the collection
      • Remove - delete the element at the specified path
      • Update - change the value for the given path to a provided value

      Need to determine if objects/arrays make sense for values or if they are needed.

      While it would be nice to be able to execute several operations per processor instance, it may be hard to capture all the relevant information needed for multiple operations in one processor configuration in a user friendly context.

        Issue Links

          Activity

          Hide
          aldrin Aldrin Piri added a comment -

          Consider the usage of JOLT: https://github.com/bazaarvoice/jolt

          Show
          aldrin Aldrin Piri added a comment - Consider the usage of JOLT: https://github.com/bazaarvoice/jolt
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user YolandaMDavis opened a pull request:

          https://github.com/apache/nifi/pull/353

          NIFI-361 - Create Processors to mutate JSON data

          This is an initial implementation of the TransformJSON processor using the Jolt library. TransformJSON supports Jolt specifications for the following transformations: Chain, Shift, Remove, and Default. Users will be able to add the TransformJSON processor, select the transformation they wish to apply and enter the specification for the given transformation.

          Details for creating Jolt specifications can be found [here](https://github.com/bazaarvoice/jolt)

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/YolandaMDavis/nifi NIFI-361

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/nifi/pull/353.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #353


          commit 3d2c2acae11823dc98cdc06c5066720216185984
          Author: Yolanda M. Davis <ydavis@hortonworks.com>
          Date: 2016-04-14T12:19:41Z

          NIFI-361 Initial implementation of TransformJSON using Jolt

          commit 3dbe30c7d5799e17c4cb8727f379de4ac36fca65
          Author: Yolanda M. Davis <ydavis@hortonworks.com>
          Date: 2016-04-14T12:20:02Z

          NIFI-361 Documentation entry and license update


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user YolandaMDavis opened a pull request: https://github.com/apache/nifi/pull/353 NIFI-361 - Create Processors to mutate JSON data This is an initial implementation of the TransformJSON processor using the Jolt library. TransformJSON supports Jolt specifications for the following transformations: Chain, Shift, Remove, and Default. Users will be able to add the TransformJSON processor, select the transformation they wish to apply and enter the specification for the given transformation. Details for creating Jolt specifications can be found [here] ( https://github.com/bazaarvoice/jolt ) You can merge this pull request into a Git repository by running: $ git pull https://github.com/YolandaMDavis/nifi NIFI-361 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/353.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #353 commit 3d2c2acae11823dc98cdc06c5066720216185984 Author: Yolanda M. Davis <ydavis@hortonworks.com> Date: 2016-04-14T12:19:41Z NIFI-361 Initial implementation of TransformJSON using Jolt commit 3dbe30c7d5799e17c4cb8727f379de4ac36fca65 Author: Yolanda M. Davis <ydavis@hortonworks.com> Date: 2016-04-14T12:20:02Z NIFI-361 Documentation entry and license update
          Hide
          YolandaMDavis Yolanda M. Davis added a comment -

          Created pull request for implementation using Jolt (see https://github.com/apache/nifi/pull/353). The processor, TransformJSON, currently supports entry of Chain, Shift, Default, and Remove specifications. Looking to include Sort potentially, which would be similar to Apache Camel implementation.

          Show
          YolandaMDavis Yolanda M. Davis added a comment - Created pull request for implementation using Jolt (see https://github.com/apache/nifi/pull/353 ). The processor, TransformJSON, currently supports entry of Chain, Shift, Default, and Remove specifications. Looking to include Sort potentially, which would be similar to Apache Camel implementation.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user YolandaMDavis closed the pull request at:

          https://github.com/apache/nifi/pull/353

          Show
          githubbot ASF GitHub Bot added a comment - Github user YolandaMDavis closed the pull request at: https://github.com/apache/nifi/pull/353
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user YolandaMDavis opened a pull request:

          https://github.com/apache/nifi/pull/354

          NIFI-361 - Create Processors to mutate JSON data

          This is an initial implementation of the TransformJSON processor using the Jolt library. TransformJSON supports Jolt specifications for the following transformations: Chain, Shift, Remove, and Default. Users will be able to add the TransformJSON processor, select the transformation they wish to apply and enter the specification for the given transformation.

          Details for creating Jolt specifications can be found [here](https://github.com/bazaarvoice/jolt)

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/YolandaMDavis/nifi NIFI-361

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/nifi/pull/354.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #354


          commit 68b5d65de0e0b1787a45bea7bcc50fdb2b625655
          Author: Yolanda M. Davis <ydavis@hortonworks.com>
          Date: 2016-04-14T12:19:41Z

          NIFI-361 Updates to test for processor including latest master merge

          commit 236471961bb577768167d3f16fd99bda3f2f1a54
          Author: Yolanda M. Davis <yolanda.m.davis@gmail.com>
          Date: 2016-04-14T20:18:54Z

          NIFI-361 add missing asterisk to documentation


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user YolandaMDavis opened a pull request: https://github.com/apache/nifi/pull/354 NIFI-361 - Create Processors to mutate JSON data This is an initial implementation of the TransformJSON processor using the Jolt library. TransformJSON supports Jolt specifications for the following transformations: Chain, Shift, Remove, and Default. Users will be able to add the TransformJSON processor, select the transformation they wish to apply and enter the specification for the given transformation. Details for creating Jolt specifications can be found [here] ( https://github.com/bazaarvoice/jolt ) You can merge this pull request into a Git repository by running: $ git pull https://github.com/YolandaMDavis/nifi NIFI-361 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/354.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #354 commit 68b5d65de0e0b1787a45bea7bcc50fdb2b625655 Author: Yolanda M. Davis <ydavis@hortonworks.com> Date: 2016-04-14T12:19:41Z NIFI-361 Updates to test for processor including latest master merge commit 236471961bb577768167d3f16fd99bda3f2f1a54 Author: Yolanda M. Davis <yolanda.m.davis@gmail.com> Date: 2016-04-14T20:18:54Z NIFI-361 add missing asterisk to documentation
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user olegz commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/354#discussion_r59997274

          — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformJSON.java —
          @@ -0,0 +1,212 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.processors.standard;
          +
          +
          +import java.io.IOException;
          +import java.io.InputStream;
          +import java.io.OutputStream;
          +import java.util.ArrayList;
          +import java.util.Collections;
          +import java.util.HashSet;
          +import java.util.List;
          +import java.util.Set;
          +import java.util.concurrent.TimeUnit;
          +
          +import org.apache.nifi.annotation.behavior.EventDriven;
          +import org.apache.nifi.annotation.behavior.InputRequirement;
          +import org.apache.nifi.annotation.behavior.SideEffectFree;
          +import org.apache.nifi.annotation.behavior.SupportsBatching;
          +import org.apache.nifi.annotation.documentation.CapabilityDescription;
          +import org.apache.nifi.annotation.documentation.Tags;
          +import org.apache.nifi.annotation.lifecycle.OnScheduled;
          +import org.apache.nifi.components.AllowableValue;
          +import org.apache.nifi.components.PropertyDescriptor;
          +import org.apache.nifi.components.ValidationContext;
          +import org.apache.nifi.components.ValidationResult;
          +import org.apache.nifi.components.Validator;
          +import org.apache.nifi.flowfile.FlowFile;
          +import org.apache.nifi.logging.ProcessorLog;
          +import org.apache.nifi.processor.AbstractProcessor;
          +import org.apache.nifi.processor.ProcessContext;
          +import org.apache.nifi.processor.ProcessSession;
          +import org.apache.nifi.processor.ProcessorInitializationContext;
          +import org.apache.nifi.processor.Relationship;
          +import org.apache.nifi.processor.exception.ProcessException;
          +import org.apache.nifi.processor.io.StreamCallback;
          +import org.apache.nifi.processor.util.StandardValidators;
          +import org.apache.nifi.stream.io.BufferedInputStream;
          +import org.apache.nifi.util.StopWatch;
          +
          +import com.bazaarvoice.jolt.Shiftr;
          +import com.bazaarvoice.jolt.Removr;
          +import com.bazaarvoice.jolt.Chainr;
          +import com.bazaarvoice.jolt.Defaultr;
          +import com.bazaarvoice.jolt.Transform;
          +import com.bazaarvoice.jolt.JsonUtils;
          +
          +@EventDriven
          +@SideEffectFree
          +@SupportsBatching
          +@Tags(

          {"json", "jolt", "transform", "shiftr", "chainr", "defaultr", "removr"}

          )
          +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
          +@CapabilityDescription("Applies a list of JOLT specifications to the flowfile JSON payload. A new FlowFile is created "
          + + "with transformed content and is routed to the 'success' relationship. If the JSON transform "
          + + "fails, the original FlowFile is routed to the 'failure' relationship")
          +public class TransformJSON extends AbstractProcessor {
          +
          + public static final AllowableValue SHIFTR = new AllowableValue("Shift", "Shift Transform DSL", "This JOLT transformation will shift input JSON/data to create the output JSON/data.");
          + public static final AllowableValue CHAINR = new AllowableValue("Chain", "Chain Transform DSL", "Execute list of JOLT transformations.");
          + public static final AllowableValue DEFAULTR = new AllowableValue("Default", "Default Transform DSL", " This JOLT transformation will apply default values to the output JSON/data.");
          + public static final AllowableValue REMOVR = new AllowableValue("Remove", "Remove Transform DSL", " This JOLT transformation will apply default values to the output JSON/data.");
          +
          + public static final PropertyDescriptor JOLT_SPEC = new PropertyDescriptor.Builder()
          + .name("Jolt Specification")
          + .description("Jolt Specification for transform of JSON data.")
          + .required(true)
          + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
          + .addValidator(new JOLTSpecValidator())
          + .required(true)
          + .build();
          +
          + public static final PropertyDescriptor JOLT_TRANSFORM = new PropertyDescriptor.Builder()
          + .name("Jolt Transformation")
          + .description("Specifies the Jolt Transformation that should be used with the provided specification.")
          + .required(true)
          + .allowableValues(SHIFTR, CHAINR, DEFAULTR, REMOVR)
          + .defaultValue(CHAINR.getValue())
          + .build();
          +
          + public static final Relationship REL_SUCCESS = new Relationship.Builder()
          + .name("success")
          + .description("The FlowFile with transformed content will be routed to this relationship")
          + .build();
          + public static final Relationship REL_FAILURE = new Relationship.Builder()
          + .name("failure")
          + .description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid JSON), it will be routed to this relationship")
          + .build();
          +
          + private List<PropertyDescriptor> properties;
          + private Set<Relationship> relationships;
          + private Transform transform;
          +
          +
          + @Override
          + protected void init(ProcessorInitializationContext context) {
          — End diff –

          @YolandaMDavis I may be wrong or new, but it appears this is your first contribution and if so Welcome to Apache NiFi!

          Show
          githubbot ASF GitHub Bot added a comment - Github user olegz commented on a diff in the pull request: https://github.com/apache/nifi/pull/354#discussion_r59997274 — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformJSON.java — @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.BufferedInputStream; +import org.apache.nifi.util.StopWatch; + +import com.bazaarvoice.jolt.Shiftr; +import com.bazaarvoice.jolt.Removr; +import com.bazaarvoice.jolt.Chainr; +import com.bazaarvoice.jolt.Defaultr; +import com.bazaarvoice.jolt.Transform; +import com.bazaarvoice.jolt.JsonUtils; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags( {"json", "jolt", "transform", "shiftr", "chainr", "defaultr", "removr"} ) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Applies a list of JOLT specifications to the flowfile JSON payload. A new FlowFile is created " + + "with transformed content and is routed to the 'success' relationship. If the JSON transform " + + "fails, the original FlowFile is routed to the 'failure' relationship") +public class TransformJSON extends AbstractProcessor { + + public static final AllowableValue SHIFTR = new AllowableValue("Shift", "Shift Transform DSL", "This JOLT transformation will shift input JSON/data to create the output JSON/data."); + public static final AllowableValue CHAINR = new AllowableValue("Chain", "Chain Transform DSL", "Execute list of JOLT transformations."); + public static final AllowableValue DEFAULTR = new AllowableValue("Default", "Default Transform DSL", " This JOLT transformation will apply default values to the output JSON/data."); + public static final AllowableValue REMOVR = new AllowableValue("Remove", "Remove Transform DSL", " This JOLT transformation will apply default values to the output JSON/data."); + + public static final PropertyDescriptor JOLT_SPEC = new PropertyDescriptor.Builder() + .name("Jolt Specification") + .description("Jolt Specification for transform of JSON data.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .addValidator(new JOLTSpecValidator()) + .required(true) + .build(); + + public static final PropertyDescriptor JOLT_TRANSFORM = new PropertyDescriptor.Builder() + .name("Jolt Transformation") + .description("Specifies the Jolt Transformation that should be used with the provided specification.") + .required(true) + .allowableValues(SHIFTR, CHAINR, DEFAULTR, REMOVR) + .defaultValue(CHAINR.getValue()) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("The FlowFile with transformed content will be routed to this relationship") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid JSON), it will be routed to this relationship") + .build(); + + private List<PropertyDescriptor> properties; + private Set<Relationship> relationships; + private Transform transform; + + + @Override + protected void init(ProcessorInitializationContext context) { — End diff – @YolandaMDavis I may be wrong or new, but it appears this is your first contribution and if so Welcome to Apache NiFi!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user olegz commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/354#discussion_r59997330

          — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformJSON.java —
          @@ -0,0 +1,212 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.processors.standard;
          +
          +
          +import java.io.IOException;
          +import java.io.InputStream;
          +import java.io.OutputStream;
          +import java.util.ArrayList;
          +import java.util.Collections;
          +import java.util.HashSet;
          +import java.util.List;
          +import java.util.Set;
          +import java.util.concurrent.TimeUnit;
          +
          +import org.apache.nifi.annotation.behavior.EventDriven;
          +import org.apache.nifi.annotation.behavior.InputRequirement;
          +import org.apache.nifi.annotation.behavior.SideEffectFree;
          +import org.apache.nifi.annotation.behavior.SupportsBatching;
          +import org.apache.nifi.annotation.documentation.CapabilityDescription;
          +import org.apache.nifi.annotation.documentation.Tags;
          +import org.apache.nifi.annotation.lifecycle.OnScheduled;
          +import org.apache.nifi.components.AllowableValue;
          +import org.apache.nifi.components.PropertyDescriptor;
          +import org.apache.nifi.components.ValidationContext;
          +import org.apache.nifi.components.ValidationResult;
          +import org.apache.nifi.components.Validator;
          +import org.apache.nifi.flowfile.FlowFile;
          +import org.apache.nifi.logging.ProcessorLog;
          +import org.apache.nifi.processor.AbstractProcessor;
          +import org.apache.nifi.processor.ProcessContext;
          +import org.apache.nifi.processor.ProcessSession;
          +import org.apache.nifi.processor.ProcessorInitializationContext;
          +import org.apache.nifi.processor.Relationship;
          +import org.apache.nifi.processor.exception.ProcessException;
          +import org.apache.nifi.processor.io.StreamCallback;
          +import org.apache.nifi.processor.util.StandardValidators;
          +import org.apache.nifi.stream.io.BufferedInputStream;
          +import org.apache.nifi.util.StopWatch;
          +
          +import com.bazaarvoice.jolt.Shiftr;
          +import com.bazaarvoice.jolt.Removr;
          +import com.bazaarvoice.jolt.Chainr;
          +import com.bazaarvoice.jolt.Defaultr;
          +import com.bazaarvoice.jolt.Transform;
          +import com.bazaarvoice.jolt.JsonUtils;
          +
          +@EventDriven
          +@SideEffectFree
          +@SupportsBatching
          +@Tags(

          {"json", "jolt", "transform", "shiftr", "chainr", "defaultr", "removr"}

          )
          +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
          +@CapabilityDescription("Applies a list of JOLT specifications to the flowfile JSON payload. A new FlowFile is created "
          + + "with transformed content and is routed to the 'success' relationship. If the JSON transform "
          + + "fails, the original FlowFile is routed to the 'failure' relationship")
          +public class TransformJSON extends AbstractProcessor {
          +
          + public static final AllowableValue SHIFTR = new AllowableValue("Shift", "Shift Transform DSL", "This JOLT transformation will shift input JSON/data to create the output JSON/data.");
          + public static final AllowableValue CHAINR = new AllowableValue("Chain", "Chain Transform DSL", "Execute list of JOLT transformations.");
          + public static final AllowableValue DEFAULTR = new AllowableValue("Default", "Default Transform DSL", " This JOLT transformation will apply default values to the output JSON/data.");
          + public static final AllowableValue REMOVR = new AllowableValue("Remove", "Remove Transform DSL", " This JOLT transformation will apply default values to the output JSON/data.");
          +
          + public static final PropertyDescriptor JOLT_SPEC = new PropertyDescriptor.Builder()
          + .name("Jolt Specification")
          + .description("Jolt Specification for transform of JSON data.")
          + .required(true)
          + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
          + .addValidator(new JOLTSpecValidator())
          + .required(true)
          + .build();
          +
          + public static final PropertyDescriptor JOLT_TRANSFORM = new PropertyDescriptor.Builder()
          + .name("Jolt Transformation")
          + .description("Specifies the Jolt Transformation that should be used with the provided specification.")
          + .required(true)
          + .allowableValues(SHIFTR, CHAINR, DEFAULTR, REMOVR)
          + .defaultValue(CHAINR.getValue())
          + .build();
          +
          + public static final Relationship REL_SUCCESS = new Relationship.Builder()
          + .name("success")
          + .description("The FlowFile with transformed content will be routed to this relationship")
          + .build();
          + public static final Relationship REL_FAILURE = new Relationship.Builder()
          + .name("failure")
          + .description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid JSON), it will be routed to this relationship")
          + .build();
          +
          + private List<PropertyDescriptor> properties;
          + private Set<Relationship> relationships;
          + private Transform transform;
          +
          +
          + @Override
          + protected void init(ProcessorInitializationContext context) {
          — End diff –

          Now the comments:
          I know what you did is a common pattern, but we have discovered certain issues with the above pattern (multiple executions of init method) and therefore would recommend a bit different approach. I cold explain but you can also take a look at one of the current PRs from fellow contributor https://github.com/apache/nifi/pull/360/files#diff-2b062238aeda341ddc1dc8c697e53bb5R106, where descriptors and relationships are set in a static init block. Let me know if you have any questions or need more help.

          Show
          githubbot ASF GitHub Bot added a comment - Github user olegz commented on a diff in the pull request: https://github.com/apache/nifi/pull/354#discussion_r59997330 — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformJSON.java — @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.BufferedInputStream; +import org.apache.nifi.util.StopWatch; + +import com.bazaarvoice.jolt.Shiftr; +import com.bazaarvoice.jolt.Removr; +import com.bazaarvoice.jolt.Chainr; +import com.bazaarvoice.jolt.Defaultr; +import com.bazaarvoice.jolt.Transform; +import com.bazaarvoice.jolt.JsonUtils; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags( {"json", "jolt", "transform", "shiftr", "chainr", "defaultr", "removr"} ) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Applies a list of JOLT specifications to the flowfile JSON payload. A new FlowFile is created " + + "with transformed content and is routed to the 'success' relationship. If the JSON transform " + + "fails, the original FlowFile is routed to the 'failure' relationship") +public class TransformJSON extends AbstractProcessor { + + public static final AllowableValue SHIFTR = new AllowableValue("Shift", "Shift Transform DSL", "This JOLT transformation will shift input JSON/data to create the output JSON/data."); + public static final AllowableValue CHAINR = new AllowableValue("Chain", "Chain Transform DSL", "Execute list of JOLT transformations."); + public static final AllowableValue DEFAULTR = new AllowableValue("Default", "Default Transform DSL", " This JOLT transformation will apply default values to the output JSON/data."); + public static final AllowableValue REMOVR = new AllowableValue("Remove", "Remove Transform DSL", " This JOLT transformation will apply default values to the output JSON/data."); + + public static final PropertyDescriptor JOLT_SPEC = new PropertyDescriptor.Builder() + .name("Jolt Specification") + .description("Jolt Specification for transform of JSON data.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .addValidator(new JOLTSpecValidator()) + .required(true) + .build(); + + public static final PropertyDescriptor JOLT_TRANSFORM = new PropertyDescriptor.Builder() + .name("Jolt Transformation") + .description("Specifies the Jolt Transformation that should be used with the provided specification.") + .required(true) + .allowableValues(SHIFTR, CHAINR, DEFAULTR, REMOVR) + .defaultValue(CHAINR.getValue()) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("The FlowFile with transformed content will be routed to this relationship") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid JSON), it will be routed to this relationship") + .build(); + + private List<PropertyDescriptor> properties; + private Set<Relationship> relationships; + private Transform transform; + + + @Override + protected void init(ProcessorInitializationContext context) { — End diff – Now the comments: I know what you did is a common pattern, but we have discovered certain issues with the above pattern (multiple executions of init method) and therefore would recommend a bit different approach. I cold explain but you can also take a look at one of the current PRs from fellow contributor https://github.com/apache/nifi/pull/360/files#diff-2b062238aeda341ddc1dc8c697e53bb5R106 , where descriptors and relationships are set in a static init block. Let me know if you have any questions or need more help.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user olegz commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/354#discussion_r59997381

          — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformJSON.java —
          @@ -0,0 +1,212 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.processors.standard;
          +
          +
          +import java.io.IOException;
          +import java.io.InputStream;
          +import java.io.OutputStream;
          +import java.util.ArrayList;
          +import java.util.Collections;
          +import java.util.HashSet;
          +import java.util.List;
          +import java.util.Set;
          +import java.util.concurrent.TimeUnit;
          +
          +import org.apache.nifi.annotation.behavior.EventDriven;
          +import org.apache.nifi.annotation.behavior.InputRequirement;
          +import org.apache.nifi.annotation.behavior.SideEffectFree;
          +import org.apache.nifi.annotation.behavior.SupportsBatching;
          +import org.apache.nifi.annotation.documentation.CapabilityDescription;
          +import org.apache.nifi.annotation.documentation.Tags;
          +import org.apache.nifi.annotation.lifecycle.OnScheduled;
          +import org.apache.nifi.components.AllowableValue;
          +import org.apache.nifi.components.PropertyDescriptor;
          +import org.apache.nifi.components.ValidationContext;
          +import org.apache.nifi.components.ValidationResult;
          +import org.apache.nifi.components.Validator;
          +import org.apache.nifi.flowfile.FlowFile;
          +import org.apache.nifi.logging.ProcessorLog;
          +import org.apache.nifi.processor.AbstractProcessor;
          +import org.apache.nifi.processor.ProcessContext;
          +import org.apache.nifi.processor.ProcessSession;
          +import org.apache.nifi.processor.ProcessorInitializationContext;
          +import org.apache.nifi.processor.Relationship;
          +import org.apache.nifi.processor.exception.ProcessException;
          +import org.apache.nifi.processor.io.StreamCallback;
          +import org.apache.nifi.processor.util.StandardValidators;
          +import org.apache.nifi.stream.io.BufferedInputStream;
          +import org.apache.nifi.util.StopWatch;
          +
          +import com.bazaarvoice.jolt.Shiftr;
          +import com.bazaarvoice.jolt.Removr;
          +import com.bazaarvoice.jolt.Chainr;
          +import com.bazaarvoice.jolt.Defaultr;
          +import com.bazaarvoice.jolt.Transform;
          +import com.bazaarvoice.jolt.JsonUtils;
          +
          +@EventDriven
          +@SideEffectFree
          +@SupportsBatching
          +@Tags(

          {"json", "jolt", "transform", "shiftr", "chainr", "defaultr", "removr"}

          )
          +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
          +@CapabilityDescription("Applies a list of JOLT specifications to the flowfile JSON payload. A new FlowFile is created "
          + + "with transformed content and is routed to the 'success' relationship. If the JSON transform "
          + + "fails, the original FlowFile is routed to the 'failure' relationship")
          +public class TransformJSON extends AbstractProcessor {
          +
          + public static final AllowableValue SHIFTR = new AllowableValue("Shift", "Shift Transform DSL", "This JOLT transformation will shift input JSON/data to create the output JSON/data.");
          + public static final AllowableValue CHAINR = new AllowableValue("Chain", "Chain Transform DSL", "Execute list of JOLT transformations.");
          + public static final AllowableValue DEFAULTR = new AllowableValue("Default", "Default Transform DSL", " This JOLT transformation will apply default values to the output JSON/data.");
          + public static final AllowableValue REMOVR = new AllowableValue("Remove", "Remove Transform DSL", " This JOLT transformation will apply default values to the output JSON/data.");
          +
          + public static final PropertyDescriptor JOLT_SPEC = new PropertyDescriptor.Builder()
          + .name("Jolt Specification")
          + .description("Jolt Specification for transform of JSON data.")
          + .required(true)
          + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
          + .addValidator(new JOLTSpecValidator())
          + .required(true)
          + .build();
          +
          + public static final PropertyDescriptor JOLT_TRANSFORM = new PropertyDescriptor.Builder()
          + .name("Jolt Transformation")
          + .description("Specifies the Jolt Transformation that should be used with the provided specification.")
          + .required(true)
          + .allowableValues(SHIFTR, CHAINR, DEFAULTR, REMOVR)
          + .defaultValue(CHAINR.getValue())
          + .build();
          +
          + public static final Relationship REL_SUCCESS = new Relationship.Builder()
          + .name("success")
          + .description("The FlowFile with transformed content will be routed to this relationship")
          + .build();
          + public static final Relationship REL_FAILURE = new Relationship.Builder()
          + .name("failure")
          + .description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid JSON), it will be routed to this relationship")
          + .build();
          +
          + private List<PropertyDescriptor> properties;
          + private Set<Relationship> relationships;
          + private Transform transform;
          +
          +
          + @Override
          + protected void init(ProcessorInitializationContext context)

          { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(JOLT_TRANSFORM); + properties.add(JOLT_SPEC); + this.properties = Collections.unmodifiableList(properties); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + }

          +
          + @Override
          + public Set<Relationship> getRelationships()

          { + return relationships; + }

          +
          + @Override
          + protected List<PropertyDescriptor> getSupportedPropertyDescriptors()

          { + return properties; + }

          +
          + @Override
          + public void onTrigger(final ProcessContext context, ProcessSession session) throws ProcessException {
          +
          + final FlowFile original = session.get();
          + if (original == null)

          { + return; + }

          +
          + final ProcessorLog logger = getLogger();
          + final StopWatch stopWatch = new StopWatch(true);
          +
          + try {
          +
          + FlowFile transformed = session.write(original, new StreamCallback() {
          + @Override
          + public void process(final InputStream rawIn, final OutputStream out) throws IOException {
          +
          + try (final InputStream in = new BufferedInputStream(rawIn))

          { + Object inputJson = JsonUtils.jsonToObject(in); + Object transformedJson = transform.transform(inputJson); + out.write(JsonUtils.toJsonString(transformedJson).getBytes()); + }

          catch (final Exception e) {
          + throw new IOException(e);
          — End diff –

          Hmm, I must be missing something, but it appears that if there is an exception it is re-throws as IOException and no disposition on FlowFile (success or failure) will ever happen . Is that intentional?

          Show
          githubbot ASF GitHub Bot added a comment - Github user olegz commented on a diff in the pull request: https://github.com/apache/nifi/pull/354#discussion_r59997381 — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformJSON.java — @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.BufferedInputStream; +import org.apache.nifi.util.StopWatch; + +import com.bazaarvoice.jolt.Shiftr; +import com.bazaarvoice.jolt.Removr; +import com.bazaarvoice.jolt.Chainr; +import com.bazaarvoice.jolt.Defaultr; +import com.bazaarvoice.jolt.Transform; +import com.bazaarvoice.jolt.JsonUtils; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags( {"json", "jolt", "transform", "shiftr", "chainr", "defaultr", "removr"} ) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Applies a list of JOLT specifications to the flowfile JSON payload. A new FlowFile is created " + + "with transformed content and is routed to the 'success' relationship. If the JSON transform " + + "fails, the original FlowFile is routed to the 'failure' relationship") +public class TransformJSON extends AbstractProcessor { + + public static final AllowableValue SHIFTR = new AllowableValue("Shift", "Shift Transform DSL", "This JOLT transformation will shift input JSON/data to create the output JSON/data."); + public static final AllowableValue CHAINR = new AllowableValue("Chain", "Chain Transform DSL", "Execute list of JOLT transformations."); + public static final AllowableValue DEFAULTR = new AllowableValue("Default", "Default Transform DSL", " This JOLT transformation will apply default values to the output JSON/data."); + public static final AllowableValue REMOVR = new AllowableValue("Remove", "Remove Transform DSL", " This JOLT transformation will apply default values to the output JSON/data."); + + public static final PropertyDescriptor JOLT_SPEC = new PropertyDescriptor.Builder() + .name("Jolt Specification") + .description("Jolt Specification for transform of JSON data.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .addValidator(new JOLTSpecValidator()) + .required(true) + .build(); + + public static final PropertyDescriptor JOLT_TRANSFORM = new PropertyDescriptor.Builder() + .name("Jolt Transformation") + .description("Specifies the Jolt Transformation that should be used with the provided specification.") + .required(true) + .allowableValues(SHIFTR, CHAINR, DEFAULTR, REMOVR) + .defaultValue(CHAINR.getValue()) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("The FlowFile with transformed content will be routed to this relationship") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid JSON), it will be routed to this relationship") + .build(); + + private List<PropertyDescriptor> properties; + private Set<Relationship> relationships; + private Transform transform; + + + @Override + protected void init(ProcessorInitializationContext context) { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(JOLT_TRANSFORM); + properties.add(JOLT_SPEC); + this.properties = Collections.unmodifiableList(properties); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public void onTrigger(final ProcessContext context, ProcessSession session) throws ProcessException { + + final FlowFile original = session.get(); + if (original == null) { + return; + } + + final ProcessorLog logger = getLogger(); + final StopWatch stopWatch = new StopWatch(true); + + try { + + FlowFile transformed = session.write(original, new StreamCallback() { + @Override + public void process(final InputStream rawIn, final OutputStream out) throws IOException { + + try (final InputStream in = new BufferedInputStream(rawIn)) { + Object inputJson = JsonUtils.jsonToObject(in); + Object transformedJson = transform.transform(inputJson); + out.write(JsonUtils.toJsonString(transformedJson).getBytes()); + } catch (final Exception e) { + throw new IOException(e); — End diff – Hmm, I must be missing something, but it appears that if there is an exception it is re-throws as IOException and no disposition on FlowFile (success or failure) will ever happen . Is that intentional?
          Hide
          ozhurakousky Oleg Zhurakousky added a comment -

          The patch (PR) has been provided by a community member @YolandaMDavis

          Show
          ozhurakousky Oleg Zhurakousky added a comment - The patch (PR) has been provided by a community member @YolandaMDavis
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user YolandaMDavis commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/354#discussion_r60080948

          — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformJSON.java —
          @@ -0,0 +1,212 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.processors.standard;
          +
          +
          +import java.io.IOException;
          +import java.io.InputStream;
          +import java.io.OutputStream;
          +import java.util.ArrayList;
          +import java.util.Collections;
          +import java.util.HashSet;
          +import java.util.List;
          +import java.util.Set;
          +import java.util.concurrent.TimeUnit;
          +
          +import org.apache.nifi.annotation.behavior.EventDriven;
          +import org.apache.nifi.annotation.behavior.InputRequirement;
          +import org.apache.nifi.annotation.behavior.SideEffectFree;
          +import org.apache.nifi.annotation.behavior.SupportsBatching;
          +import org.apache.nifi.annotation.documentation.CapabilityDescription;
          +import org.apache.nifi.annotation.documentation.Tags;
          +import org.apache.nifi.annotation.lifecycle.OnScheduled;
          +import org.apache.nifi.components.AllowableValue;
          +import org.apache.nifi.components.PropertyDescriptor;
          +import org.apache.nifi.components.ValidationContext;
          +import org.apache.nifi.components.ValidationResult;
          +import org.apache.nifi.components.Validator;
          +import org.apache.nifi.flowfile.FlowFile;
          +import org.apache.nifi.logging.ProcessorLog;
          +import org.apache.nifi.processor.AbstractProcessor;
          +import org.apache.nifi.processor.ProcessContext;
          +import org.apache.nifi.processor.ProcessSession;
          +import org.apache.nifi.processor.ProcessorInitializationContext;
          +import org.apache.nifi.processor.Relationship;
          +import org.apache.nifi.processor.exception.ProcessException;
          +import org.apache.nifi.processor.io.StreamCallback;
          +import org.apache.nifi.processor.util.StandardValidators;
          +import org.apache.nifi.stream.io.BufferedInputStream;
          +import org.apache.nifi.util.StopWatch;
          +
          +import com.bazaarvoice.jolt.Shiftr;
          +import com.bazaarvoice.jolt.Removr;
          +import com.bazaarvoice.jolt.Chainr;
          +import com.bazaarvoice.jolt.Defaultr;
          +import com.bazaarvoice.jolt.Transform;
          +import com.bazaarvoice.jolt.JsonUtils;
          +
          +@EventDriven
          +@SideEffectFree
          +@SupportsBatching
          +@Tags(

          {"json", "jolt", "transform", "shiftr", "chainr", "defaultr", "removr"}

          )
          +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
          +@CapabilityDescription("Applies a list of JOLT specifications to the flowfile JSON payload. A new FlowFile is created "
          + + "with transformed content and is routed to the 'success' relationship. If the JSON transform "
          + + "fails, the original FlowFile is routed to the 'failure' relationship")
          +public class TransformJSON extends AbstractProcessor {
          +
          + public static final AllowableValue SHIFTR = new AllowableValue("Shift", "Shift Transform DSL", "This JOLT transformation will shift input JSON/data to create the output JSON/data.");
          + public static final AllowableValue CHAINR = new AllowableValue("Chain", "Chain Transform DSL", "Execute list of JOLT transformations.");
          + public static final AllowableValue DEFAULTR = new AllowableValue("Default", "Default Transform DSL", " This JOLT transformation will apply default values to the output JSON/data.");
          + public static final AllowableValue REMOVR = new AllowableValue("Remove", "Remove Transform DSL", " This JOLT transformation will apply default values to the output JSON/data.");
          +
          + public static final PropertyDescriptor JOLT_SPEC = new PropertyDescriptor.Builder()
          + .name("Jolt Specification")
          + .description("Jolt Specification for transform of JSON data.")
          + .required(true)
          + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
          + .addValidator(new JOLTSpecValidator())
          + .required(true)
          + .build();
          +
          + public static final PropertyDescriptor JOLT_TRANSFORM = new PropertyDescriptor.Builder()
          + .name("Jolt Transformation")
          + .description("Specifies the Jolt Transformation that should be used with the provided specification.")
          + .required(true)
          + .allowableValues(SHIFTR, CHAINR, DEFAULTR, REMOVR)
          + .defaultValue(CHAINR.getValue())
          + .build();
          +
          + public static final Relationship REL_SUCCESS = new Relationship.Builder()
          + .name("success")
          + .description("The FlowFile with transformed content will be routed to this relationship")
          + .build();
          + public static final Relationship REL_FAILURE = new Relationship.Builder()
          + .name("failure")
          + .description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid JSON), it will be routed to this relationship")
          + .build();
          +
          + private List<PropertyDescriptor> properties;
          + private Set<Relationship> relationships;
          + private Transform transform;
          +
          +
          + @Override
          + protected void init(ProcessorInitializationContext context)

          { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(JOLT_TRANSFORM); + properties.add(JOLT_SPEC); + this.properties = Collections.unmodifiableList(properties); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + }

          +
          + @Override
          + public Set<Relationship> getRelationships()

          { + return relationships; + }

          +
          + @Override
          + protected List<PropertyDescriptor> getSupportedPropertyDescriptors()

          { + return properties; + }

          +
          + @Override
          + public void onTrigger(final ProcessContext context, ProcessSession session) throws ProcessException {
          +
          + final FlowFile original = session.get();
          + if (original == null)

          { + return; + }

          +
          + final ProcessorLog logger = getLogger();
          + final StopWatch stopWatch = new StopWatch(true);
          +
          + try {
          +
          + FlowFile transformed = session.write(original, new StreamCallback() {
          + @Override
          + public void process(final InputStream rawIn, final OutputStream out) throws IOException {
          +
          + try (final InputStream in = new BufferedInputStream(rawIn))

          { + Object inputJson = JsonUtils.jsonToObject(in); + Object transformedJson = transform.transform(inputJson); + out.write(JsonUtils.toJsonString(transformedJson).getBytes()); + }

          catch (final Exception e) {
          + throw new IOException(e);
          — End diff –

          I was following the pattern set by the TransformXML, however my thoughts around the catch/rethrow is to ensure that whatever exception is generated by the JsonUtils or the Transform object will be handled gracefully. Looking at the StandardProcessSession as an example, the rethrown IOException appears to be later caught and rethrown as a ProcessException, which the code catches, logs and then issues a transfer to failure.

          Show
          githubbot ASF GitHub Bot added a comment - Github user YolandaMDavis commented on a diff in the pull request: https://github.com/apache/nifi/pull/354#discussion_r60080948 — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformJSON.java — @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.BufferedInputStream; +import org.apache.nifi.util.StopWatch; + +import com.bazaarvoice.jolt.Shiftr; +import com.bazaarvoice.jolt.Removr; +import com.bazaarvoice.jolt.Chainr; +import com.bazaarvoice.jolt.Defaultr; +import com.bazaarvoice.jolt.Transform; +import com.bazaarvoice.jolt.JsonUtils; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags( {"json", "jolt", "transform", "shiftr", "chainr", "defaultr", "removr"} ) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Applies a list of JOLT specifications to the flowfile JSON payload. A new FlowFile is created " + + "with transformed content and is routed to the 'success' relationship. If the JSON transform " + + "fails, the original FlowFile is routed to the 'failure' relationship") +public class TransformJSON extends AbstractProcessor { + + public static final AllowableValue SHIFTR = new AllowableValue("Shift", "Shift Transform DSL", "This JOLT transformation will shift input JSON/data to create the output JSON/data."); + public static final AllowableValue CHAINR = new AllowableValue("Chain", "Chain Transform DSL", "Execute list of JOLT transformations."); + public static final AllowableValue DEFAULTR = new AllowableValue("Default", "Default Transform DSL", " This JOLT transformation will apply default values to the output JSON/data."); + public static final AllowableValue REMOVR = new AllowableValue("Remove", "Remove Transform DSL", " This JOLT transformation will apply default values to the output JSON/data."); + + public static final PropertyDescriptor JOLT_SPEC = new PropertyDescriptor.Builder() + .name("Jolt Specification") + .description("Jolt Specification for transform of JSON data.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .addValidator(new JOLTSpecValidator()) + .required(true) + .build(); + + public static final PropertyDescriptor JOLT_TRANSFORM = new PropertyDescriptor.Builder() + .name("Jolt Transformation") + .description("Specifies the Jolt Transformation that should be used with the provided specification.") + .required(true) + .allowableValues(SHIFTR, CHAINR, DEFAULTR, REMOVR) + .defaultValue(CHAINR.getValue()) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("The FlowFile with transformed content will be routed to this relationship") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid JSON), it will be routed to this relationship") + .build(); + + private List<PropertyDescriptor> properties; + private Set<Relationship> relationships; + private Transform transform; + + + @Override + protected void init(ProcessorInitializationContext context) { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(JOLT_TRANSFORM); + properties.add(JOLT_SPEC); + this.properties = Collections.unmodifiableList(properties); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public void onTrigger(final ProcessContext context, ProcessSession session) throws ProcessException { + + final FlowFile original = session.get(); + if (original == null) { + return; + } + + final ProcessorLog logger = getLogger(); + final StopWatch stopWatch = new StopWatch(true); + + try { + + FlowFile transformed = session.write(original, new StreamCallback() { + @Override + public void process(final InputStream rawIn, final OutputStream out) throws IOException { + + try (final InputStream in = new BufferedInputStream(rawIn)) { + Object inputJson = JsonUtils.jsonToObject(in); + Object transformedJson = transform.transform(inputJson); + out.write(JsonUtils.toJsonString(transformedJson).getBytes()); + } catch (final Exception e) { + throw new IOException(e); — End diff – I was following the pattern set by the TransformXML, however my thoughts around the catch/rethrow is to ensure that whatever exception is generated by the JsonUtils or the Transform object will be handled gracefully. Looking at the StandardProcessSession as an example, the rethrown IOException appears to be later caught and rethrown as a ProcessException, which the code catches, logs and then issues a transfer to failure.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user olegz commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/354#discussion_r60084343

          — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformJSON.java —
          @@ -0,0 +1,212 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.processors.standard;
          +
          +
          +import java.io.IOException;
          +import java.io.InputStream;
          +import java.io.OutputStream;
          +import java.util.ArrayList;
          +import java.util.Collections;
          +import java.util.HashSet;
          +import java.util.List;
          +import java.util.Set;
          +import java.util.concurrent.TimeUnit;
          +
          +import org.apache.nifi.annotation.behavior.EventDriven;
          +import org.apache.nifi.annotation.behavior.InputRequirement;
          +import org.apache.nifi.annotation.behavior.SideEffectFree;
          +import org.apache.nifi.annotation.behavior.SupportsBatching;
          +import org.apache.nifi.annotation.documentation.CapabilityDescription;
          +import org.apache.nifi.annotation.documentation.Tags;
          +import org.apache.nifi.annotation.lifecycle.OnScheduled;
          +import org.apache.nifi.components.AllowableValue;
          +import org.apache.nifi.components.PropertyDescriptor;
          +import org.apache.nifi.components.ValidationContext;
          +import org.apache.nifi.components.ValidationResult;
          +import org.apache.nifi.components.Validator;
          +import org.apache.nifi.flowfile.FlowFile;
          +import org.apache.nifi.logging.ProcessorLog;
          +import org.apache.nifi.processor.AbstractProcessor;
          +import org.apache.nifi.processor.ProcessContext;
          +import org.apache.nifi.processor.ProcessSession;
          +import org.apache.nifi.processor.ProcessorInitializationContext;
          +import org.apache.nifi.processor.Relationship;
          +import org.apache.nifi.processor.exception.ProcessException;
          +import org.apache.nifi.processor.io.StreamCallback;
          +import org.apache.nifi.processor.util.StandardValidators;
          +import org.apache.nifi.stream.io.BufferedInputStream;
          +import org.apache.nifi.util.StopWatch;
          +
          +import com.bazaarvoice.jolt.Shiftr;
          +import com.bazaarvoice.jolt.Removr;
          +import com.bazaarvoice.jolt.Chainr;
          +import com.bazaarvoice.jolt.Defaultr;
          +import com.bazaarvoice.jolt.Transform;
          +import com.bazaarvoice.jolt.JsonUtils;
          +
          +@EventDriven
          +@SideEffectFree
          +@SupportsBatching
          +@Tags(

          {"json", "jolt", "transform", "shiftr", "chainr", "defaultr", "removr"}

          )
          +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
          +@CapabilityDescription("Applies a list of JOLT specifications to the flowfile JSON payload. A new FlowFile is created "
          + + "with transformed content and is routed to the 'success' relationship. If the JSON transform "
          + + "fails, the original FlowFile is routed to the 'failure' relationship")
          +public class TransformJSON extends AbstractProcessor {
          +
          + public static final AllowableValue SHIFTR = new AllowableValue("Shift", "Shift Transform DSL", "This JOLT transformation will shift input JSON/data to create the output JSON/data.");
          + public static final AllowableValue CHAINR = new AllowableValue("Chain", "Chain Transform DSL", "Execute list of JOLT transformations.");
          + public static final AllowableValue DEFAULTR = new AllowableValue("Default", "Default Transform DSL", " This JOLT transformation will apply default values to the output JSON/data.");
          + public static final AllowableValue REMOVR = new AllowableValue("Remove", "Remove Transform DSL", " This JOLT transformation will apply default values to the output JSON/data.");
          +
          + public static final PropertyDescriptor JOLT_SPEC = new PropertyDescriptor.Builder()
          + .name("Jolt Specification")
          + .description("Jolt Specification for transform of JSON data.")
          + .required(true)
          + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
          + .addValidator(new JOLTSpecValidator())
          + .required(true)
          + .build();
          +
          + public static final PropertyDescriptor JOLT_TRANSFORM = new PropertyDescriptor.Builder()
          + .name("Jolt Transformation")
          + .description("Specifies the Jolt Transformation that should be used with the provided specification.")
          + .required(true)
          + .allowableValues(SHIFTR, CHAINR, DEFAULTR, REMOVR)
          + .defaultValue(CHAINR.getValue())
          + .build();
          +
          + public static final Relationship REL_SUCCESS = new Relationship.Builder()
          + .name("success")
          + .description("The FlowFile with transformed content will be routed to this relationship")
          + .build();
          + public static final Relationship REL_FAILURE = new Relationship.Builder()
          + .name("failure")
          + .description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid JSON), it will be routed to this relationship")
          + .build();
          +
          + private List<PropertyDescriptor> properties;
          + private Set<Relationship> relationships;
          + private Transform transform;
          +
          +
          + @Override
          + protected void init(ProcessorInitializationContext context)

          { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(JOLT_TRANSFORM); + properties.add(JOLT_SPEC); + this.properties = Collections.unmodifiableList(properties); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + }

          +
          + @Override
          + public Set<Relationship> getRelationships()

          { + return relationships; + }

          +
          + @Override
          + protected List<PropertyDescriptor> getSupportedPropertyDescriptors()

          { + return properties; + }

          +
          + @Override
          + public void onTrigger(final ProcessContext context, ProcessSession session) throws ProcessException {
          +
          + final FlowFile original = session.get();
          + if (original == null)

          { + return; + }

          +
          + final ProcessorLog logger = getLogger();
          + final StopWatch stopWatch = new StopWatch(true);
          +
          + try {
          +
          + FlowFile transformed = session.write(original, new StreamCallback() {
          + @Override
          + public void process(final InputStream rawIn, final OutputStream out) throws IOException {
          +
          + try (final InputStream in = new BufferedInputStream(rawIn))

          { + Object inputJson = JsonUtils.jsonToObject(in); + Object transformedJson = transform.transform(inputJson); + out.write(JsonUtils.toJsonString(transformedJson).getBytes()); + }

          catch (final Exception e) {
          + throw new IOException(e);
          — End diff –

          Fair enough, but then you also have a higher level try/catch block where you also catch ProcessException, so I am still not sure if that can ever happen. In any event, using some old code as an example is a good start, but we are all humans after all and that means that old code may have mistakes or things that could be improved, so looking at it is fine, but questioning it is event better

          Show
          githubbot ASF GitHub Bot added a comment - Github user olegz commented on a diff in the pull request: https://github.com/apache/nifi/pull/354#discussion_r60084343 — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformJSON.java — @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.BufferedInputStream; +import org.apache.nifi.util.StopWatch; + +import com.bazaarvoice.jolt.Shiftr; +import com.bazaarvoice.jolt.Removr; +import com.bazaarvoice.jolt.Chainr; +import com.bazaarvoice.jolt.Defaultr; +import com.bazaarvoice.jolt.Transform; +import com.bazaarvoice.jolt.JsonUtils; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags( {"json", "jolt", "transform", "shiftr", "chainr", "defaultr", "removr"} ) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Applies a list of JOLT specifications to the flowfile JSON payload. A new FlowFile is created " + + "with transformed content and is routed to the 'success' relationship. If the JSON transform " + + "fails, the original FlowFile is routed to the 'failure' relationship") +public class TransformJSON extends AbstractProcessor { + + public static final AllowableValue SHIFTR = new AllowableValue("Shift", "Shift Transform DSL", "This JOLT transformation will shift input JSON/data to create the output JSON/data."); + public static final AllowableValue CHAINR = new AllowableValue("Chain", "Chain Transform DSL", "Execute list of JOLT transformations."); + public static final AllowableValue DEFAULTR = new AllowableValue("Default", "Default Transform DSL", " This JOLT transformation will apply default values to the output JSON/data."); + public static final AllowableValue REMOVR = new AllowableValue("Remove", "Remove Transform DSL", " This JOLT transformation will apply default values to the output JSON/data."); + + public static final PropertyDescriptor JOLT_SPEC = new PropertyDescriptor.Builder() + .name("Jolt Specification") + .description("Jolt Specification for transform of JSON data.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .addValidator(new JOLTSpecValidator()) + .required(true) + .build(); + + public static final PropertyDescriptor JOLT_TRANSFORM = new PropertyDescriptor.Builder() + .name("Jolt Transformation") + .description("Specifies the Jolt Transformation that should be used with the provided specification.") + .required(true) + .allowableValues(SHIFTR, CHAINR, DEFAULTR, REMOVR) + .defaultValue(CHAINR.getValue()) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("The FlowFile with transformed content will be routed to this relationship") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid JSON), it will be routed to this relationship") + .build(); + + private List<PropertyDescriptor> properties; + private Set<Relationship> relationships; + private Transform transform; + + + @Override + protected void init(ProcessorInitializationContext context) { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(JOLT_TRANSFORM); + properties.add(JOLT_SPEC); + this.properties = Collections.unmodifiableList(properties); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public void onTrigger(final ProcessContext context, ProcessSession session) throws ProcessException { + + final FlowFile original = session.get(); + if (original == null) { + return; + } + + final ProcessorLog logger = getLogger(); + final StopWatch stopWatch = new StopWatch(true); + + try { + + FlowFile transformed = session.write(original, new StreamCallback() { + @Override + public void process(final InputStream rawIn, final OutputStream out) throws IOException { + + try (final InputStream in = new BufferedInputStream(rawIn)) { + Object inputJson = JsonUtils.jsonToObject(in); + Object transformedJson = transform.transform(inputJson); + out.write(JsonUtils.toJsonString(transformedJson).getBytes()); + } catch (final Exception e) { + throw new IOException(e); — End diff – Fair enough, but then you also have a higher level try/catch block where you also catch ProcessException, so I am still not sure if that can ever happen. In any event, using some old code as an example is a good start, but we are all humans after all and that means that old code may have mistakes or things that could be improved, so looking at it is fine, but questioning it is event better
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user olegz commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/354#discussion_r60084831

          — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformJSON.java —
          @@ -0,0 +1,212 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.processors.standard;
          +
          +
          +import java.io.IOException;
          +import java.io.InputStream;
          +import java.io.OutputStream;
          +import java.util.ArrayList;
          +import java.util.Collections;
          +import java.util.HashSet;
          +import java.util.List;
          +import java.util.Set;
          +import java.util.concurrent.TimeUnit;
          +
          +import org.apache.nifi.annotation.behavior.EventDriven;
          +import org.apache.nifi.annotation.behavior.InputRequirement;
          +import org.apache.nifi.annotation.behavior.SideEffectFree;
          +import org.apache.nifi.annotation.behavior.SupportsBatching;
          +import org.apache.nifi.annotation.documentation.CapabilityDescription;
          +import org.apache.nifi.annotation.documentation.Tags;
          +import org.apache.nifi.annotation.lifecycle.OnScheduled;
          +import org.apache.nifi.components.AllowableValue;
          +import org.apache.nifi.components.PropertyDescriptor;
          +import org.apache.nifi.components.ValidationContext;
          +import org.apache.nifi.components.ValidationResult;
          +import org.apache.nifi.components.Validator;
          +import org.apache.nifi.flowfile.FlowFile;
          +import org.apache.nifi.logging.ProcessorLog;
          +import org.apache.nifi.processor.AbstractProcessor;
          +import org.apache.nifi.processor.ProcessContext;
          +import org.apache.nifi.processor.ProcessSession;
          +import org.apache.nifi.processor.ProcessorInitializationContext;
          +import org.apache.nifi.processor.Relationship;
          +import org.apache.nifi.processor.exception.ProcessException;
          +import org.apache.nifi.processor.io.StreamCallback;
          +import org.apache.nifi.processor.util.StandardValidators;
          +import org.apache.nifi.stream.io.BufferedInputStream;
          +import org.apache.nifi.util.StopWatch;
          +
          +import com.bazaarvoice.jolt.Shiftr;
          +import com.bazaarvoice.jolt.Removr;
          +import com.bazaarvoice.jolt.Chainr;
          +import com.bazaarvoice.jolt.Defaultr;
          +import com.bazaarvoice.jolt.Transform;
          +import com.bazaarvoice.jolt.JsonUtils;
          +
          +@EventDriven
          +@SideEffectFree
          +@SupportsBatching
          +@Tags(

          {"json", "jolt", "transform", "shiftr", "chainr", "defaultr", "removr"}

          )
          +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
          +@CapabilityDescription("Applies a list of JOLT specifications to the flowfile JSON payload. A new FlowFile is created "
          + + "with transformed content and is routed to the 'success' relationship. If the JSON transform "
          + + "fails, the original FlowFile is routed to the 'failure' relationship")
          +public class TransformJSON extends AbstractProcessor {
          +
          + public static final AllowableValue SHIFTR = new AllowableValue("Shift", "Shift Transform DSL", "This JOLT transformation will shift input JSON/data to create the output JSON/data.");
          + public static final AllowableValue CHAINR = new AllowableValue("Chain", "Chain Transform DSL", "Execute list of JOLT transformations.");
          + public static final AllowableValue DEFAULTR = new AllowableValue("Default", "Default Transform DSL", " This JOLT transformation will apply default values to the output JSON/data.");
          + public static final AllowableValue REMOVR = new AllowableValue("Remove", "Remove Transform DSL", " This JOLT transformation will apply default values to the output JSON/data.");
          +
          + public static final PropertyDescriptor JOLT_SPEC = new PropertyDescriptor.Builder()
          + .name("Jolt Specification")
          + .description("Jolt Specification for transform of JSON data.")
          + .required(true)
          + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
          + .addValidator(new JOLTSpecValidator())
          + .required(true)
          + .build();
          +
          + public static final PropertyDescriptor JOLT_TRANSFORM = new PropertyDescriptor.Builder()
          + .name("Jolt Transformation")
          + .description("Specifies the Jolt Transformation that should be used with the provided specification.")
          + .required(true)
          + .allowableValues(SHIFTR, CHAINR, DEFAULTR, REMOVR)
          + .defaultValue(CHAINR.getValue())
          + .build();
          +
          + public static final Relationship REL_SUCCESS = new Relationship.Builder()
          + .name("success")
          + .description("The FlowFile with transformed content will be routed to this relationship")
          + .build();
          + public static final Relationship REL_FAILURE = new Relationship.Builder()
          + .name("failure")
          + .description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid JSON), it will be routed to this relationship")
          + .build();
          +
          + private List<PropertyDescriptor> properties;
          + private Set<Relationship> relationships;
          + private Transform transform;
          +
          +
          + @Override
          + protected void init(ProcessorInitializationContext context)

          { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(JOLT_TRANSFORM); + properties.add(JOLT_SPEC); + this.properties = Collections.unmodifiableList(properties); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + }

          +
          + @Override
          + public Set<Relationship> getRelationships()

          { + return relationships; + }

          +
          + @Override
          + protected List<PropertyDescriptor> getSupportedPropertyDescriptors()

          { + return properties; + }

          +
          + @Override
          + public void onTrigger(final ProcessContext context, ProcessSession session) throws ProcessException {
          +
          + final FlowFile original = session.get();
          + if (original == null)

          { + return; + }

          +
          + final ProcessorLog logger = getLogger();
          + final StopWatch stopWatch = new StopWatch(true);
          +
          + try {
          +
          + FlowFile transformed = session.write(original, new StreamCallback() {
          + @Override
          + public void process(final InputStream rawIn, final OutputStream out) throws IOException {
          +
          + try (final InputStream in = new BufferedInputStream(rawIn))

          { + Object inputJson = JsonUtils.jsonToObject(in); + Object transformedJson = transform.transform(inputJson); + out.write(JsonUtils.toJsonString(transformedJson).getBytes()); + }

          catch (final Exception e) {
          + throw new IOException(e);
          — End diff –

          What I would also recommend is to use code coverage tools. Not sure what IDE you are using, but if you are using Eclipse, ECL Emma comes with it and by running through it you'll be able to actually see how many of your code was executed. That should help you to either force a scenario in your unit tests for a particular condition or question if that condition is actually valid.

          Show
          githubbot ASF GitHub Bot added a comment - Github user olegz commented on a diff in the pull request: https://github.com/apache/nifi/pull/354#discussion_r60084831 — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformJSON.java — @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.BufferedInputStream; +import org.apache.nifi.util.StopWatch; + +import com.bazaarvoice.jolt.Shiftr; +import com.bazaarvoice.jolt.Removr; +import com.bazaarvoice.jolt.Chainr; +import com.bazaarvoice.jolt.Defaultr; +import com.bazaarvoice.jolt.Transform; +import com.bazaarvoice.jolt.JsonUtils; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags( {"json", "jolt", "transform", "shiftr", "chainr", "defaultr", "removr"} ) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Applies a list of JOLT specifications to the flowfile JSON payload. A new FlowFile is created " + + "with transformed content and is routed to the 'success' relationship. If the JSON transform " + + "fails, the original FlowFile is routed to the 'failure' relationship") +public class TransformJSON extends AbstractProcessor { + + public static final AllowableValue SHIFTR = new AllowableValue("Shift", "Shift Transform DSL", "This JOLT transformation will shift input JSON/data to create the output JSON/data."); + public static final AllowableValue CHAINR = new AllowableValue("Chain", "Chain Transform DSL", "Execute list of JOLT transformations."); + public static final AllowableValue DEFAULTR = new AllowableValue("Default", "Default Transform DSL", " This JOLT transformation will apply default values to the output JSON/data."); + public static final AllowableValue REMOVR = new AllowableValue("Remove", "Remove Transform DSL", " This JOLT transformation will apply default values to the output JSON/data."); + + public static final PropertyDescriptor JOLT_SPEC = new PropertyDescriptor.Builder() + .name("Jolt Specification") + .description("Jolt Specification for transform of JSON data.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .addValidator(new JOLTSpecValidator()) + .required(true) + .build(); + + public static final PropertyDescriptor JOLT_TRANSFORM = new PropertyDescriptor.Builder() + .name("Jolt Transformation") + .description("Specifies the Jolt Transformation that should be used with the provided specification.") + .required(true) + .allowableValues(SHIFTR, CHAINR, DEFAULTR, REMOVR) + .defaultValue(CHAINR.getValue()) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("The FlowFile with transformed content will be routed to this relationship") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid JSON), it will be routed to this relationship") + .build(); + + private List<PropertyDescriptor> properties; + private Set<Relationship> relationships; + private Transform transform; + + + @Override + protected void init(ProcessorInitializationContext context) { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(JOLT_TRANSFORM); + properties.add(JOLT_SPEC); + this.properties = Collections.unmodifiableList(properties); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public void onTrigger(final ProcessContext context, ProcessSession session) throws ProcessException { + + final FlowFile original = session.get(); + if (original == null) { + return; + } + + final ProcessorLog logger = getLogger(); + final StopWatch stopWatch = new StopWatch(true); + + try { + + FlowFile transformed = session.write(original, new StreamCallback() { + @Override + public void process(final InputStream rawIn, final OutputStream out) throws IOException { + + try (final InputStream in = new BufferedInputStream(rawIn)) { + Object inputJson = JsonUtils.jsonToObject(in); + Object transformedJson = transform.transform(inputJson); + out.write(JsonUtils.toJsonString(transformedJson).getBytes()); + } catch (final Exception e) { + throw new IOException(e); — End diff – What I would also recommend is to use code coverage tools. Not sure what IDE you are using, but if you are using Eclipse, ECL Emma comes with it and by running through it you'll be able to actually see how many of your code was executed. That should help you to either force a scenario in your unit tests for a particular condition or question if that condition is actually valid.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user YolandaMDavis commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/354#discussion_r60115891

          — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformJSON.java —
          @@ -0,0 +1,212 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.processors.standard;
          +
          +
          +import java.io.IOException;
          +import java.io.InputStream;
          +import java.io.OutputStream;
          +import java.util.ArrayList;
          +import java.util.Collections;
          +import java.util.HashSet;
          +import java.util.List;
          +import java.util.Set;
          +import java.util.concurrent.TimeUnit;
          +
          +import org.apache.nifi.annotation.behavior.EventDriven;
          +import org.apache.nifi.annotation.behavior.InputRequirement;
          +import org.apache.nifi.annotation.behavior.SideEffectFree;
          +import org.apache.nifi.annotation.behavior.SupportsBatching;
          +import org.apache.nifi.annotation.documentation.CapabilityDescription;
          +import org.apache.nifi.annotation.documentation.Tags;
          +import org.apache.nifi.annotation.lifecycle.OnScheduled;
          +import org.apache.nifi.components.AllowableValue;
          +import org.apache.nifi.components.PropertyDescriptor;
          +import org.apache.nifi.components.ValidationContext;
          +import org.apache.nifi.components.ValidationResult;
          +import org.apache.nifi.components.Validator;
          +import org.apache.nifi.flowfile.FlowFile;
          +import org.apache.nifi.logging.ProcessorLog;
          +import org.apache.nifi.processor.AbstractProcessor;
          +import org.apache.nifi.processor.ProcessContext;
          +import org.apache.nifi.processor.ProcessSession;
          +import org.apache.nifi.processor.ProcessorInitializationContext;
          +import org.apache.nifi.processor.Relationship;
          +import org.apache.nifi.processor.exception.ProcessException;
          +import org.apache.nifi.processor.io.StreamCallback;
          +import org.apache.nifi.processor.util.StandardValidators;
          +import org.apache.nifi.stream.io.BufferedInputStream;
          +import org.apache.nifi.util.StopWatch;
          +
          +import com.bazaarvoice.jolt.Shiftr;
          +import com.bazaarvoice.jolt.Removr;
          +import com.bazaarvoice.jolt.Chainr;
          +import com.bazaarvoice.jolt.Defaultr;
          +import com.bazaarvoice.jolt.Transform;
          +import com.bazaarvoice.jolt.JsonUtils;
          +
          +@EventDriven
          +@SideEffectFree
          +@SupportsBatching
          +@Tags(

          {"json", "jolt", "transform", "shiftr", "chainr", "defaultr", "removr"}

          )
          +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
          +@CapabilityDescription("Applies a list of JOLT specifications to the flowfile JSON payload. A new FlowFile is created "
          + + "with transformed content and is routed to the 'success' relationship. If the JSON transform "
          + + "fails, the original FlowFile is routed to the 'failure' relationship")
          +public class TransformJSON extends AbstractProcessor {
          +
          + public static final AllowableValue SHIFTR = new AllowableValue("Shift", "Shift Transform DSL", "This JOLT transformation will shift input JSON/data to create the output JSON/data.");
          + public static final AllowableValue CHAINR = new AllowableValue("Chain", "Chain Transform DSL", "Execute list of JOLT transformations.");
          + public static final AllowableValue DEFAULTR = new AllowableValue("Default", "Default Transform DSL", " This JOLT transformation will apply default values to the output JSON/data.");
          + public static final AllowableValue REMOVR = new AllowableValue("Remove", "Remove Transform DSL", " This JOLT transformation will apply default values to the output JSON/data.");
          +
          + public static final PropertyDescriptor JOLT_SPEC = new PropertyDescriptor.Builder()
          + .name("Jolt Specification")
          + .description("Jolt Specification for transform of JSON data.")
          + .required(true)
          + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
          + .addValidator(new JOLTSpecValidator())
          + .required(true)
          + .build();
          +
          + public static final PropertyDescriptor JOLT_TRANSFORM = new PropertyDescriptor.Builder()
          + .name("Jolt Transformation")
          + .description("Specifies the Jolt Transformation that should be used with the provided specification.")
          + .required(true)
          + .allowableValues(SHIFTR, CHAINR, DEFAULTR, REMOVR)
          + .defaultValue(CHAINR.getValue())
          + .build();
          +
          + public static final Relationship REL_SUCCESS = new Relationship.Builder()
          + .name("success")
          + .description("The FlowFile with transformed content will be routed to this relationship")
          + .build();
          + public static final Relationship REL_FAILURE = new Relationship.Builder()
          + .name("failure")
          + .description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid JSON), it will be routed to this relationship")
          + .build();
          +
          + private List<PropertyDescriptor> properties;
          + private Set<Relationship> relationships;
          + private Transform transform;
          +
          +
          + @Override
          + protected void init(ProcessorInitializationContext context)

          { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(JOLT_TRANSFORM); + properties.add(JOLT_SPEC); + this.properties = Collections.unmodifiableList(properties); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + }

          +
          + @Override
          + public Set<Relationship> getRelationships()

          { + return relationships; + }

          +
          + @Override
          + protected List<PropertyDescriptor> getSupportedPropertyDescriptors()

          { + return properties; + }

          +
          + @Override
          + public void onTrigger(final ProcessContext context, ProcessSession session) throws ProcessException {
          +
          + final FlowFile original = session.get();
          + if (original == null)

          { + return; + }

          +
          + final ProcessorLog logger = getLogger();
          + final StopWatch stopWatch = new StopWatch(true);
          +
          + try {
          +
          + FlowFile transformed = session.write(original, new StreamCallback() {
          + @Override
          + public void process(final InputStream rawIn, final OutputStream out) throws IOException {
          +
          + try (final InputStream in = new BufferedInputStream(rawIn))

          { + Object inputJson = JsonUtils.jsonToObject(in); + Object transformedJson = transform.transform(inputJson); + out.write(JsonUtils.toJsonString(transformedJson).getBytes()); + }

          catch (final Exception e) {
          + throw new IOException(e);
          — End diff –

          Hi Oleg, based on your comments I used IntelliJ's coverage tools and confirmed this section of code in question was reached. The test TestTransformJSON.testInvalidFlowFileContent triggers the exception. If the flow file content isn't json the utility class I'm using to parse (which is backed by Jackson) internally catches a JsonParseException and rethrows it as a RuntimeException. If it allowed the JsonParseException through it would have been fine, since it extends the IOException. Not quite sure why they wrapped it in a RuntimeException.
          Anyway I did confirm that not having the catch block misses the ProcessException altogether and my test fails, since it's checking that flow files were routed to the failure relationship.
          Also I did add a couple of tests to ensure 100% coverage.

          Will commit and push changes shortly for review.

          Show
          githubbot ASF GitHub Bot added a comment - Github user YolandaMDavis commented on a diff in the pull request: https://github.com/apache/nifi/pull/354#discussion_r60115891 — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformJSON.java — @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.BufferedInputStream; +import org.apache.nifi.util.StopWatch; + +import com.bazaarvoice.jolt.Shiftr; +import com.bazaarvoice.jolt.Removr; +import com.bazaarvoice.jolt.Chainr; +import com.bazaarvoice.jolt.Defaultr; +import com.bazaarvoice.jolt.Transform; +import com.bazaarvoice.jolt.JsonUtils; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags( {"json", "jolt", "transform", "shiftr", "chainr", "defaultr", "removr"} ) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Applies a list of JOLT specifications to the flowfile JSON payload. A new FlowFile is created " + + "with transformed content and is routed to the 'success' relationship. If the JSON transform " + + "fails, the original FlowFile is routed to the 'failure' relationship") +public class TransformJSON extends AbstractProcessor { + + public static final AllowableValue SHIFTR = new AllowableValue("Shift", "Shift Transform DSL", "This JOLT transformation will shift input JSON/data to create the output JSON/data."); + public static final AllowableValue CHAINR = new AllowableValue("Chain", "Chain Transform DSL", "Execute list of JOLT transformations."); + public static final AllowableValue DEFAULTR = new AllowableValue("Default", "Default Transform DSL", " This JOLT transformation will apply default values to the output JSON/data."); + public static final AllowableValue REMOVR = new AllowableValue("Remove", "Remove Transform DSL", " This JOLT transformation will apply default values to the output JSON/data."); + + public static final PropertyDescriptor JOLT_SPEC = new PropertyDescriptor.Builder() + .name("Jolt Specification") + .description("Jolt Specification for transform of JSON data.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .addValidator(new JOLTSpecValidator()) + .required(true) + .build(); + + public static final PropertyDescriptor JOLT_TRANSFORM = new PropertyDescriptor.Builder() + .name("Jolt Transformation") + .description("Specifies the Jolt Transformation that should be used with the provided specification.") + .required(true) + .allowableValues(SHIFTR, CHAINR, DEFAULTR, REMOVR) + .defaultValue(CHAINR.getValue()) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("The FlowFile with transformed content will be routed to this relationship") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid JSON), it will be routed to this relationship") + .build(); + + private List<PropertyDescriptor> properties; + private Set<Relationship> relationships; + private Transform transform; + + + @Override + protected void init(ProcessorInitializationContext context) { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(JOLT_TRANSFORM); + properties.add(JOLT_SPEC); + this.properties = Collections.unmodifiableList(properties); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public void onTrigger(final ProcessContext context, ProcessSession session) throws ProcessException { + + final FlowFile original = session.get(); + if (original == null) { + return; + } + + final ProcessorLog logger = getLogger(); + final StopWatch stopWatch = new StopWatch(true); + + try { + + FlowFile transformed = session.write(original, new StreamCallback() { + @Override + public void process(final InputStream rawIn, final OutputStream out) throws IOException { + + try (final InputStream in = new BufferedInputStream(rawIn)) { + Object inputJson = JsonUtils.jsonToObject(in); + Object transformedJson = transform.transform(inputJson); + out.write(JsonUtils.toJsonString(transformedJson).getBytes()); + } catch (final Exception e) { + throw new IOException(e); — End diff – Hi Oleg, based on your comments I used IntelliJ's coverage tools and confirmed this section of code in question was reached. The test TestTransformJSON.testInvalidFlowFileContent triggers the exception. If the flow file content isn't json the utility class I'm using to parse (which is backed by Jackson) internally catches a JsonParseException and rethrows it as a RuntimeException. If it allowed the JsonParseException through it would have been fine, since it extends the IOException. Not quite sure why they wrapped it in a RuntimeException. Anyway I did confirm that not having the catch block misses the ProcessException altogether and my test fails, since it's checking that flow files were routed to the failure relationship. Also I did add a couple of tests to ensure 100% coverage. Will commit and push changes shortly for review.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user olegz commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/354#discussion_r60117078

          — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformJSON.java —
          @@ -0,0 +1,212 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.processors.standard;
          +
          +
          +import java.io.IOException;
          +import java.io.InputStream;
          +import java.io.OutputStream;
          +import java.util.ArrayList;
          +import java.util.Collections;
          +import java.util.HashSet;
          +import java.util.List;
          +import java.util.Set;
          +import java.util.concurrent.TimeUnit;
          +
          +import org.apache.nifi.annotation.behavior.EventDriven;
          +import org.apache.nifi.annotation.behavior.InputRequirement;
          +import org.apache.nifi.annotation.behavior.SideEffectFree;
          +import org.apache.nifi.annotation.behavior.SupportsBatching;
          +import org.apache.nifi.annotation.documentation.CapabilityDescription;
          +import org.apache.nifi.annotation.documentation.Tags;
          +import org.apache.nifi.annotation.lifecycle.OnScheduled;
          +import org.apache.nifi.components.AllowableValue;
          +import org.apache.nifi.components.PropertyDescriptor;
          +import org.apache.nifi.components.ValidationContext;
          +import org.apache.nifi.components.ValidationResult;
          +import org.apache.nifi.components.Validator;
          +import org.apache.nifi.flowfile.FlowFile;
          +import org.apache.nifi.logging.ProcessorLog;
          +import org.apache.nifi.processor.AbstractProcessor;
          +import org.apache.nifi.processor.ProcessContext;
          +import org.apache.nifi.processor.ProcessSession;
          +import org.apache.nifi.processor.ProcessorInitializationContext;
          +import org.apache.nifi.processor.Relationship;
          +import org.apache.nifi.processor.exception.ProcessException;
          +import org.apache.nifi.processor.io.StreamCallback;
          +import org.apache.nifi.processor.util.StandardValidators;
          +import org.apache.nifi.stream.io.BufferedInputStream;
          +import org.apache.nifi.util.StopWatch;
          +
          +import com.bazaarvoice.jolt.Shiftr;
          +import com.bazaarvoice.jolt.Removr;
          +import com.bazaarvoice.jolt.Chainr;
          +import com.bazaarvoice.jolt.Defaultr;
          +import com.bazaarvoice.jolt.Transform;
          +import com.bazaarvoice.jolt.JsonUtils;
          +
          +@EventDriven
          +@SideEffectFree
          +@SupportsBatching
          +@Tags(

          {"json", "jolt", "transform", "shiftr", "chainr", "defaultr", "removr"}

          )
          +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
          +@CapabilityDescription("Applies a list of JOLT specifications to the flowfile JSON payload. A new FlowFile is created "
          + + "with transformed content and is routed to the 'success' relationship. If the JSON transform "
          + + "fails, the original FlowFile is routed to the 'failure' relationship")
          +public class TransformJSON extends AbstractProcessor {
          +
          + public static final AllowableValue SHIFTR = new AllowableValue("Shift", "Shift Transform DSL", "This JOLT transformation will shift input JSON/data to create the output JSON/data.");
          + public static final AllowableValue CHAINR = new AllowableValue("Chain", "Chain Transform DSL", "Execute list of JOLT transformations.");
          + public static final AllowableValue DEFAULTR = new AllowableValue("Default", "Default Transform DSL", " This JOLT transformation will apply default values to the output JSON/data.");
          + public static final AllowableValue REMOVR = new AllowableValue("Remove", "Remove Transform DSL", " This JOLT transformation will apply default values to the output JSON/data.");
          +
          + public static final PropertyDescriptor JOLT_SPEC = new PropertyDescriptor.Builder()
          + .name("Jolt Specification")
          + .description("Jolt Specification for transform of JSON data.")
          + .required(true)
          + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
          + .addValidator(new JOLTSpecValidator())
          + .required(true)
          + .build();
          +
          + public static final PropertyDescriptor JOLT_TRANSFORM = new PropertyDescriptor.Builder()
          + .name("Jolt Transformation")
          + .description("Specifies the Jolt Transformation that should be used with the provided specification.")
          + .required(true)
          + .allowableValues(SHIFTR, CHAINR, DEFAULTR, REMOVR)
          + .defaultValue(CHAINR.getValue())
          + .build();
          +
          + public static final Relationship REL_SUCCESS = new Relationship.Builder()
          + .name("success")
          + .description("The FlowFile with transformed content will be routed to this relationship")
          + .build();
          + public static final Relationship REL_FAILURE = new Relationship.Builder()
          + .name("failure")
          + .description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid JSON), it will be routed to this relationship")
          + .build();
          +
          + private List<PropertyDescriptor> properties;
          + private Set<Relationship> relationships;
          + private Transform transform;
          +
          +
          + @Override
          + protected void init(ProcessorInitializationContext context)

          { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(JOLT_TRANSFORM); + properties.add(JOLT_SPEC); + this.properties = Collections.unmodifiableList(properties); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + }

          +
          + @Override
          + public Set<Relationship> getRelationships()

          { + return relationships; + }

          +
          + @Override
          + protected List<PropertyDescriptor> getSupportedPropertyDescriptors()

          { + return properties; + }

          +
          + @Override
          + public void onTrigger(final ProcessContext context, ProcessSession session) throws ProcessException {
          +
          + final FlowFile original = session.get();
          + if (original == null)

          { + return; + }

          +
          + final ProcessorLog logger = getLogger();
          + final StopWatch stopWatch = new StopWatch(true);
          +
          + try {
          +
          + FlowFile transformed = session.write(original, new StreamCallback() {
          + @Override
          + public void process(final InputStream rawIn, final OutputStream out) throws IOException {
          +
          + try (final InputStream in = new BufferedInputStream(rawIn))

          { + Object inputJson = JsonUtils.jsonToObject(in); + Object transformedJson = transform.transform(inputJson); + out.write(JsonUtils.toJsonString(transformedJson).getBytes()); + }

          catch (final Exception e) {
          + throw new IOException(e);
          — End diff –

          @YolandaMDavis perfect! Let me know when its pushed and I'll review

          Show
          githubbot ASF GitHub Bot added a comment - Github user olegz commented on a diff in the pull request: https://github.com/apache/nifi/pull/354#discussion_r60117078 — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformJSON.java — @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.BufferedInputStream; +import org.apache.nifi.util.StopWatch; + +import com.bazaarvoice.jolt.Shiftr; +import com.bazaarvoice.jolt.Removr; +import com.bazaarvoice.jolt.Chainr; +import com.bazaarvoice.jolt.Defaultr; +import com.bazaarvoice.jolt.Transform; +import com.bazaarvoice.jolt.JsonUtils; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags( {"json", "jolt", "transform", "shiftr", "chainr", "defaultr", "removr"} ) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Applies a list of JOLT specifications to the flowfile JSON payload. A new FlowFile is created " + + "with transformed content and is routed to the 'success' relationship. If the JSON transform " + + "fails, the original FlowFile is routed to the 'failure' relationship") +public class TransformJSON extends AbstractProcessor { + + public static final AllowableValue SHIFTR = new AllowableValue("Shift", "Shift Transform DSL", "This JOLT transformation will shift input JSON/data to create the output JSON/data."); + public static final AllowableValue CHAINR = new AllowableValue("Chain", "Chain Transform DSL", "Execute list of JOLT transformations."); + public static final AllowableValue DEFAULTR = new AllowableValue("Default", "Default Transform DSL", " This JOLT transformation will apply default values to the output JSON/data."); + public static final AllowableValue REMOVR = new AllowableValue("Remove", "Remove Transform DSL", " This JOLT transformation will apply default values to the output JSON/data."); + + public static final PropertyDescriptor JOLT_SPEC = new PropertyDescriptor.Builder() + .name("Jolt Specification") + .description("Jolt Specification for transform of JSON data.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .addValidator(new JOLTSpecValidator()) + .required(true) + .build(); + + public static final PropertyDescriptor JOLT_TRANSFORM = new PropertyDescriptor.Builder() + .name("Jolt Transformation") + .description("Specifies the Jolt Transformation that should be used with the provided specification.") + .required(true) + .allowableValues(SHIFTR, CHAINR, DEFAULTR, REMOVR) + .defaultValue(CHAINR.getValue()) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("The FlowFile with transformed content will be routed to this relationship") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid JSON), it will be routed to this relationship") + .build(); + + private List<PropertyDescriptor> properties; + private Set<Relationship> relationships; + private Transform transform; + + + @Override + protected void init(ProcessorInitializationContext context) { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(JOLT_TRANSFORM); + properties.add(JOLT_SPEC); + this.properties = Collections.unmodifiableList(properties); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public void onTrigger(final ProcessContext context, ProcessSession session) throws ProcessException { + + final FlowFile original = session.get(); + if (original == null) { + return; + } + + final ProcessorLog logger = getLogger(); + final StopWatch stopWatch = new StopWatch(true); + + try { + + FlowFile transformed = session.write(original, new StreamCallback() { + @Override + public void process(final InputStream rawIn, final OutputStream out) throws IOException { + + try (final InputStream in = new BufferedInputStream(rawIn)) { + Object inputJson = JsonUtils.jsonToObject(in); + Object transformedJson = transform.transform(inputJson); + out.write(JsonUtils.toJsonString(transformedJson).getBytes()); + } catch (final Exception e) { + throw new IOException(e); — End diff – @YolandaMDavis perfect! Let me know when its pushed and I'll review
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user olegz commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/354#discussion_r60118046

          — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformJSON.java —
          @@ -0,0 +1,212 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.processors.standard;
          +
          +
          +import java.io.IOException;
          +import java.io.InputStream;
          +import java.io.OutputStream;
          +import java.util.ArrayList;
          +import java.util.Collections;
          +import java.util.HashSet;
          +import java.util.List;
          +import java.util.Set;
          +import java.util.concurrent.TimeUnit;
          +
          +import org.apache.nifi.annotation.behavior.EventDriven;
          +import org.apache.nifi.annotation.behavior.InputRequirement;
          +import org.apache.nifi.annotation.behavior.SideEffectFree;
          +import org.apache.nifi.annotation.behavior.SupportsBatching;
          +import org.apache.nifi.annotation.documentation.CapabilityDescription;
          +import org.apache.nifi.annotation.documentation.Tags;
          +import org.apache.nifi.annotation.lifecycle.OnScheduled;
          +import org.apache.nifi.components.AllowableValue;
          +import org.apache.nifi.components.PropertyDescriptor;
          +import org.apache.nifi.components.ValidationContext;
          +import org.apache.nifi.components.ValidationResult;
          +import org.apache.nifi.components.Validator;
          +import org.apache.nifi.flowfile.FlowFile;
          +import org.apache.nifi.logging.ProcessorLog;
          +import org.apache.nifi.processor.AbstractProcessor;
          +import org.apache.nifi.processor.ProcessContext;
          +import org.apache.nifi.processor.ProcessSession;
          +import org.apache.nifi.processor.ProcessorInitializationContext;
          +import org.apache.nifi.processor.Relationship;
          +import org.apache.nifi.processor.exception.ProcessException;
          +import org.apache.nifi.processor.io.StreamCallback;
          +import org.apache.nifi.processor.util.StandardValidators;
          +import org.apache.nifi.stream.io.BufferedInputStream;
          +import org.apache.nifi.util.StopWatch;
          +
          +import com.bazaarvoice.jolt.Shiftr;
          +import com.bazaarvoice.jolt.Removr;
          +import com.bazaarvoice.jolt.Chainr;
          +import com.bazaarvoice.jolt.Defaultr;
          +import com.bazaarvoice.jolt.Transform;
          +import com.bazaarvoice.jolt.JsonUtils;
          +
          +@EventDriven
          +@SideEffectFree
          +@SupportsBatching
          +@Tags(

          {"json", "jolt", "transform", "shiftr", "chainr", "defaultr", "removr"}

          )
          +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
          +@CapabilityDescription("Applies a list of JOLT specifications to the flowfile JSON payload. A new FlowFile is created "
          + + "with transformed content and is routed to the 'success' relationship. If the JSON transform "
          + + "fails, the original FlowFile is routed to the 'failure' relationship")
          +public class TransformJSON extends AbstractProcessor {
          +
          + public static final AllowableValue SHIFTR = new AllowableValue("Shift", "Shift Transform DSL", "This JOLT transformation will shift input JSON/data to create the output JSON/data.");
          + public static final AllowableValue CHAINR = new AllowableValue("Chain", "Chain Transform DSL", "Execute list of JOLT transformations.");
          + public static final AllowableValue DEFAULTR = new AllowableValue("Default", "Default Transform DSL", " This JOLT transformation will apply default values to the output JSON/data.");
          + public static final AllowableValue REMOVR = new AllowableValue("Remove", "Remove Transform DSL", " This JOLT transformation will apply default values to the output JSON/data.");
          +
          + public static final PropertyDescriptor JOLT_SPEC = new PropertyDescriptor.Builder()
          + .name("Jolt Specification")
          + .description("Jolt Specification for transform of JSON data.")
          + .required(true)
          + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
          + .addValidator(new JOLTSpecValidator())
          + .required(true)
          + .build();
          +
          + public static final PropertyDescriptor JOLT_TRANSFORM = new PropertyDescriptor.Builder()
          + .name("Jolt Transformation")
          + .description("Specifies the Jolt Transformation that should be used with the provided specification.")
          + .required(true)
          + .allowableValues(SHIFTR, CHAINR, DEFAULTR, REMOVR)
          + .defaultValue(CHAINR.getValue())
          + .build();
          +
          + public static final Relationship REL_SUCCESS = new Relationship.Builder()
          + .name("success")
          + .description("The FlowFile with transformed content will be routed to this relationship")
          + .build();
          + public static final Relationship REL_FAILURE = new Relationship.Builder()
          + .name("failure")
          + .description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid JSON), it will be routed to this relationship")
          + .build();
          +
          + private List<PropertyDescriptor> properties;
          + private Set<Relationship> relationships;
          — End diff –

          Assuming the above will become 'static final' (both) based on the suggested approach, correct?

          Show
          githubbot ASF GitHub Bot added a comment - Github user olegz commented on a diff in the pull request: https://github.com/apache/nifi/pull/354#discussion_r60118046 — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformJSON.java — @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.BufferedInputStream; +import org.apache.nifi.util.StopWatch; + +import com.bazaarvoice.jolt.Shiftr; +import com.bazaarvoice.jolt.Removr; +import com.bazaarvoice.jolt.Chainr; +import com.bazaarvoice.jolt.Defaultr; +import com.bazaarvoice.jolt.Transform; +import com.bazaarvoice.jolt.JsonUtils; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags( {"json", "jolt", "transform", "shiftr", "chainr", "defaultr", "removr"} ) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Applies a list of JOLT specifications to the flowfile JSON payload. A new FlowFile is created " + + "with transformed content and is routed to the 'success' relationship. If the JSON transform " + + "fails, the original FlowFile is routed to the 'failure' relationship") +public class TransformJSON extends AbstractProcessor { + + public static final AllowableValue SHIFTR = new AllowableValue("Shift", "Shift Transform DSL", "This JOLT transformation will shift input JSON/data to create the output JSON/data."); + public static final AllowableValue CHAINR = new AllowableValue("Chain", "Chain Transform DSL", "Execute list of JOLT transformations."); + public static final AllowableValue DEFAULTR = new AllowableValue("Default", "Default Transform DSL", " This JOLT transformation will apply default values to the output JSON/data."); + public static final AllowableValue REMOVR = new AllowableValue("Remove", "Remove Transform DSL", " This JOLT transformation will apply default values to the output JSON/data."); + + public static final PropertyDescriptor JOLT_SPEC = new PropertyDescriptor.Builder() + .name("Jolt Specification") + .description("Jolt Specification for transform of JSON data.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .addValidator(new JOLTSpecValidator()) + .required(true) + .build(); + + public static final PropertyDescriptor JOLT_TRANSFORM = new PropertyDescriptor.Builder() + .name("Jolt Transformation") + .description("Specifies the Jolt Transformation that should be used with the provided specification.") + .required(true) + .allowableValues(SHIFTR, CHAINR, DEFAULTR, REMOVR) + .defaultValue(CHAINR.getValue()) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("The FlowFile with transformed content will be routed to this relationship") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid JSON), it will be routed to this relationship") + .build(); + + private List<PropertyDescriptor> properties; + private Set<Relationship> relationships; — End diff – Assuming the above will become 'static final' (both) based on the suggested approach, correct?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user olegz commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/354#discussion_r60118214

          — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformJSON.java —
          @@ -0,0 +1,212 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.processors.standard;
          +
          +
          +import java.io.IOException;
          +import java.io.InputStream;
          +import java.io.OutputStream;
          +import java.util.ArrayList;
          +import java.util.Collections;
          +import java.util.HashSet;
          +import java.util.List;
          +import java.util.Set;
          +import java.util.concurrent.TimeUnit;
          +
          +import org.apache.nifi.annotation.behavior.EventDriven;
          +import org.apache.nifi.annotation.behavior.InputRequirement;
          +import org.apache.nifi.annotation.behavior.SideEffectFree;
          +import org.apache.nifi.annotation.behavior.SupportsBatching;
          +import org.apache.nifi.annotation.documentation.CapabilityDescription;
          +import org.apache.nifi.annotation.documentation.Tags;
          +import org.apache.nifi.annotation.lifecycle.OnScheduled;
          +import org.apache.nifi.components.AllowableValue;
          +import org.apache.nifi.components.PropertyDescriptor;
          +import org.apache.nifi.components.ValidationContext;
          +import org.apache.nifi.components.ValidationResult;
          +import org.apache.nifi.components.Validator;
          +import org.apache.nifi.flowfile.FlowFile;
          +import org.apache.nifi.logging.ProcessorLog;
          +import org.apache.nifi.processor.AbstractProcessor;
          +import org.apache.nifi.processor.ProcessContext;
          +import org.apache.nifi.processor.ProcessSession;
          +import org.apache.nifi.processor.ProcessorInitializationContext;
          +import org.apache.nifi.processor.Relationship;
          +import org.apache.nifi.processor.exception.ProcessException;
          +import org.apache.nifi.processor.io.StreamCallback;
          +import org.apache.nifi.processor.util.StandardValidators;
          +import org.apache.nifi.stream.io.BufferedInputStream;
          +import org.apache.nifi.util.StopWatch;
          +
          +import com.bazaarvoice.jolt.Shiftr;
          +import com.bazaarvoice.jolt.Removr;
          +import com.bazaarvoice.jolt.Chainr;
          +import com.bazaarvoice.jolt.Defaultr;
          +import com.bazaarvoice.jolt.Transform;
          +import com.bazaarvoice.jolt.JsonUtils;
          +
          +@EventDriven
          +@SideEffectFree
          +@SupportsBatching
          +@Tags(

          {"json", "jolt", "transform", "shiftr", "chainr", "defaultr", "removr"}

          )
          +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
          +@CapabilityDescription("Applies a list of JOLT specifications to the flowfile JSON payload. A new FlowFile is created "
          + + "with transformed content and is routed to the 'success' relationship. If the JSON transform "
          + + "fails, the original FlowFile is routed to the 'failure' relationship")
          +public class TransformJSON extends AbstractProcessor {
          +
          + public static final AllowableValue SHIFTR = new AllowableValue("Shift", "Shift Transform DSL", "This JOLT transformation will shift input JSON/data to create the output JSON/data.");
          + public static final AllowableValue CHAINR = new AllowableValue("Chain", "Chain Transform DSL", "Execute list of JOLT transformations.");
          + public static final AllowableValue DEFAULTR = new AllowableValue("Default", "Default Transform DSL", " This JOLT transformation will apply default values to the output JSON/data.");
          + public static final AllowableValue REMOVR = new AllowableValue("Remove", "Remove Transform DSL", " This JOLT transformation will apply default values to the output JSON/data.");
          +
          + public static final PropertyDescriptor JOLT_SPEC = new PropertyDescriptor.Builder()
          + .name("Jolt Specification")
          + .description("Jolt Specification for transform of JSON data.")
          + .required(true)
          + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
          + .addValidator(new JOLTSpecValidator())
          + .required(true)
          + .build();
          +
          + public static final PropertyDescriptor JOLT_TRANSFORM = new PropertyDescriptor.Builder()
          + .name("Jolt Transformation")
          + .description("Specifies the Jolt Transformation that should be used with the provided specification.")
          + .required(true)
          + .allowableValues(SHIFTR, CHAINR, DEFAULTR, REMOVR)
          + .defaultValue(CHAINR.getValue())
          + .build();
          +
          + public static final Relationship REL_SUCCESS = new Relationship.Builder()
          + .name("success")
          + .description("The FlowFile with transformed content will be routed to this relationship")
          + .build();
          + public static final Relationship REL_FAILURE = new Relationship.Builder()
          + .name("failure")
          + .description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid JSON), it will be routed to this relationship")
          + .build();
          +
          + private List<PropertyDescriptor> properties;
          + private Set<Relationship> relationships;
          + private Transform transform;
          — End diff –

          This needs to also be 'volatile' to ensure visibility between the threads.

          Show
          githubbot ASF GitHub Bot added a comment - Github user olegz commented on a diff in the pull request: https://github.com/apache/nifi/pull/354#discussion_r60118214 — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformJSON.java — @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.BufferedInputStream; +import org.apache.nifi.util.StopWatch; + +import com.bazaarvoice.jolt.Shiftr; +import com.bazaarvoice.jolt.Removr; +import com.bazaarvoice.jolt.Chainr; +import com.bazaarvoice.jolt.Defaultr; +import com.bazaarvoice.jolt.Transform; +import com.bazaarvoice.jolt.JsonUtils; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags( {"json", "jolt", "transform", "shiftr", "chainr", "defaultr", "removr"} ) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Applies a list of JOLT specifications to the flowfile JSON payload. A new FlowFile is created " + + "with transformed content and is routed to the 'success' relationship. If the JSON transform " + + "fails, the original FlowFile is routed to the 'failure' relationship") +public class TransformJSON extends AbstractProcessor { + + public static final AllowableValue SHIFTR = new AllowableValue("Shift", "Shift Transform DSL", "This JOLT transformation will shift input JSON/data to create the output JSON/data."); + public static final AllowableValue CHAINR = new AllowableValue("Chain", "Chain Transform DSL", "Execute list of JOLT transformations."); + public static final AllowableValue DEFAULTR = new AllowableValue("Default", "Default Transform DSL", " This JOLT transformation will apply default values to the output JSON/data."); + public static final AllowableValue REMOVR = new AllowableValue("Remove", "Remove Transform DSL", " This JOLT transformation will apply default values to the output JSON/data."); + + public static final PropertyDescriptor JOLT_SPEC = new PropertyDescriptor.Builder() + .name("Jolt Specification") + .description("Jolt Specification for transform of JSON data.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .addValidator(new JOLTSpecValidator()) + .required(true) + .build(); + + public static final PropertyDescriptor JOLT_TRANSFORM = new PropertyDescriptor.Builder() + .name("Jolt Transformation") + .description("Specifies the Jolt Transformation that should be used with the provided specification.") + .required(true) + .allowableValues(SHIFTR, CHAINR, DEFAULTR, REMOVR) + .defaultValue(CHAINR.getValue()) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("The FlowFile with transformed content will be routed to this relationship") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid JSON), it will be routed to this relationship") + .build(); + + private List<PropertyDescriptor> properties; + private Set<Relationship> relationships; + private Transform transform; — End diff – This needs to also be 'volatile' to ensure visibility between the threads.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user YolandaMDavis commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/354#discussion_r60164832

          — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformJSON.java —
          @@ -0,0 +1,212 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.processors.standard;
          +
          +
          +import java.io.IOException;
          +import java.io.InputStream;
          +import java.io.OutputStream;
          +import java.util.ArrayList;
          +import java.util.Collections;
          +import java.util.HashSet;
          +import java.util.List;
          +import java.util.Set;
          +import java.util.concurrent.TimeUnit;
          +
          +import org.apache.nifi.annotation.behavior.EventDriven;
          +import org.apache.nifi.annotation.behavior.InputRequirement;
          +import org.apache.nifi.annotation.behavior.SideEffectFree;
          +import org.apache.nifi.annotation.behavior.SupportsBatching;
          +import org.apache.nifi.annotation.documentation.CapabilityDescription;
          +import org.apache.nifi.annotation.documentation.Tags;
          +import org.apache.nifi.annotation.lifecycle.OnScheduled;
          +import org.apache.nifi.components.AllowableValue;
          +import org.apache.nifi.components.PropertyDescriptor;
          +import org.apache.nifi.components.ValidationContext;
          +import org.apache.nifi.components.ValidationResult;
          +import org.apache.nifi.components.Validator;
          +import org.apache.nifi.flowfile.FlowFile;
          +import org.apache.nifi.logging.ProcessorLog;
          +import org.apache.nifi.processor.AbstractProcessor;
          +import org.apache.nifi.processor.ProcessContext;
          +import org.apache.nifi.processor.ProcessSession;
          +import org.apache.nifi.processor.ProcessorInitializationContext;
          +import org.apache.nifi.processor.Relationship;
          +import org.apache.nifi.processor.exception.ProcessException;
          +import org.apache.nifi.processor.io.StreamCallback;
          +import org.apache.nifi.processor.util.StandardValidators;
          +import org.apache.nifi.stream.io.BufferedInputStream;
          +import org.apache.nifi.util.StopWatch;
          +
          +import com.bazaarvoice.jolt.Shiftr;
          +import com.bazaarvoice.jolt.Removr;
          +import com.bazaarvoice.jolt.Chainr;
          +import com.bazaarvoice.jolt.Defaultr;
          +import com.bazaarvoice.jolt.Transform;
          +import com.bazaarvoice.jolt.JsonUtils;
          +
          +@EventDriven
          +@SideEffectFree
          +@SupportsBatching
          +@Tags(

          {"json", "jolt", "transform", "shiftr", "chainr", "defaultr", "removr"}

          )
          +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
          +@CapabilityDescription("Applies a list of JOLT specifications to the flowfile JSON payload. A new FlowFile is created "
          + + "with transformed content and is routed to the 'success' relationship. If the JSON transform "
          + + "fails, the original FlowFile is routed to the 'failure' relationship")
          +public class TransformJSON extends AbstractProcessor {
          +
          + public static final AllowableValue SHIFTR = new AllowableValue("Shift", "Shift Transform DSL", "This JOLT transformation will shift input JSON/data to create the output JSON/data.");
          + public static final AllowableValue CHAINR = new AllowableValue("Chain", "Chain Transform DSL", "Execute list of JOLT transformations.");
          + public static final AllowableValue DEFAULTR = new AllowableValue("Default", "Default Transform DSL", " This JOLT transformation will apply default values to the output JSON/data.");
          + public static final AllowableValue REMOVR = new AllowableValue("Remove", "Remove Transform DSL", " This JOLT transformation will apply default values to the output JSON/data.");
          +
          + public static final PropertyDescriptor JOLT_SPEC = new PropertyDescriptor.Builder()
          + .name("Jolt Specification")
          + .description("Jolt Specification for transform of JSON data.")
          + .required(true)
          + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
          + .addValidator(new JOLTSpecValidator())
          + .required(true)
          + .build();
          +
          + public static final PropertyDescriptor JOLT_TRANSFORM = new PropertyDescriptor.Builder()
          + .name("Jolt Transformation")
          + .description("Specifies the Jolt Transformation that should be used with the provided specification.")
          + .required(true)
          + .allowableValues(SHIFTR, CHAINR, DEFAULTR, REMOVR)
          + .defaultValue(CHAINR.getValue())
          + .build();
          +
          + public static final Relationship REL_SUCCESS = new Relationship.Builder()
          + .name("success")
          + .description("The FlowFile with transformed content will be routed to this relationship")
          + .build();
          + public static final Relationship REL_FAILURE = new Relationship.Builder()
          + .name("failure")
          + .description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid JSON), it will be routed to this relationship")
          + .build();
          +
          + private List<PropertyDescriptor> properties;
          + private Set<Relationship> relationships;
          + private Transform transform;
          +
          +
          + @Override
          + protected void init(ProcessorInitializationContext context)

          { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(JOLT_TRANSFORM); + properties.add(JOLT_SPEC); + this.properties = Collections.unmodifiableList(properties); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + }

          +
          + @Override
          + public Set<Relationship> getRelationships()

          { + return relationships; + }

          +
          + @Override
          + protected List<PropertyDescriptor> getSupportedPropertyDescriptors()

          { + return properties; + }

          +
          + @Override
          + public void onTrigger(final ProcessContext context, ProcessSession session) throws ProcessException {
          +
          + final FlowFile original = session.get();
          + if (original == null)

          { + return; + }

          +
          + final ProcessorLog logger = getLogger();
          + final StopWatch stopWatch = new StopWatch(true);
          +
          + try {
          +
          + FlowFile transformed = session.write(original, new StreamCallback() {
          + @Override
          + public void process(final InputStream rawIn, final OutputStream out) throws IOException {
          +
          + try (final InputStream in = new BufferedInputStream(rawIn))

          { + Object inputJson = JsonUtils.jsonToObject(in); + Object transformedJson = transform.transform(inputJson); + out.write(JsonUtils.toJsonString(transformedJson).getBytes()); + }

          catch (final Exception e) {
          + throw new IOException(e);
          — End diff –

          @olegz pushed the latest with discussed changes.

          Show
          githubbot ASF GitHub Bot added a comment - Github user YolandaMDavis commented on a diff in the pull request: https://github.com/apache/nifi/pull/354#discussion_r60164832 — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformJSON.java — @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.BufferedInputStream; +import org.apache.nifi.util.StopWatch; + +import com.bazaarvoice.jolt.Shiftr; +import com.bazaarvoice.jolt.Removr; +import com.bazaarvoice.jolt.Chainr; +import com.bazaarvoice.jolt.Defaultr; +import com.bazaarvoice.jolt.Transform; +import com.bazaarvoice.jolt.JsonUtils; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags( {"json", "jolt", "transform", "shiftr", "chainr", "defaultr", "removr"} ) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Applies a list of JOLT specifications to the flowfile JSON payload. A new FlowFile is created " + + "with transformed content and is routed to the 'success' relationship. If the JSON transform " + + "fails, the original FlowFile is routed to the 'failure' relationship") +public class TransformJSON extends AbstractProcessor { + + public static final AllowableValue SHIFTR = new AllowableValue("Shift", "Shift Transform DSL", "This JOLT transformation will shift input JSON/data to create the output JSON/data."); + public static final AllowableValue CHAINR = new AllowableValue("Chain", "Chain Transform DSL", "Execute list of JOLT transformations."); + public static final AllowableValue DEFAULTR = new AllowableValue("Default", "Default Transform DSL", " This JOLT transformation will apply default values to the output JSON/data."); + public static final AllowableValue REMOVR = new AllowableValue("Remove", "Remove Transform DSL", " This JOLT transformation will apply default values to the output JSON/data."); + + public static final PropertyDescriptor JOLT_SPEC = new PropertyDescriptor.Builder() + .name("Jolt Specification") + .description("Jolt Specification for transform of JSON data.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .addValidator(new JOLTSpecValidator()) + .required(true) + .build(); + + public static final PropertyDescriptor JOLT_TRANSFORM = new PropertyDescriptor.Builder() + .name("Jolt Transformation") + .description("Specifies the Jolt Transformation that should be used with the provided specification.") + .required(true) + .allowableValues(SHIFTR, CHAINR, DEFAULTR, REMOVR) + .defaultValue(CHAINR.getValue()) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("The FlowFile with transformed content will be routed to this relationship") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid JSON), it will be routed to this relationship") + .build(); + + private List<PropertyDescriptor> properties; + private Set<Relationship> relationships; + private Transform transform; + + + @Override + protected void init(ProcessorInitializationContext context) { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(JOLT_TRANSFORM); + properties.add(JOLT_SPEC); + this.properties = Collections.unmodifiableList(properties); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public void onTrigger(final ProcessContext context, ProcessSession session) throws ProcessException { + + final FlowFile original = session.get(); + if (original == null) { + return; + } + + final ProcessorLog logger = getLogger(); + final StopWatch stopWatch = new StopWatch(true); + + try { + + FlowFile transformed = session.write(original, new StreamCallback() { + @Override + public void process(final InputStream rawIn, final OutputStream out) throws IOException { + + try (final InputStream in = new BufferedInputStream(rawIn)) { + Object inputJson = JsonUtils.jsonToObject(in); + Object transformedJson = transform.transform(inputJson); + out.write(JsonUtils.toJsonString(transformedJson).getBytes()); + } catch (final Exception e) { + throw new IOException(e); — End diff – @olegz pushed the latest with discussed changes.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user olegz commented on the pull request:

          https://github.com/apache/nifi/pull/354#issuecomment-211703487

          @YolandaMDavis this looks very good. One general comment; Just to finish the life-cycle loop I'd suggest adding a stop operation (operation annotated with @OnStopped) where based on what I see you would simply set transformer to null, unless there is some stop/cleanup procedure required (e.g., close something etc.).

          Show
          githubbot ASF GitHub Bot added a comment - Github user olegz commented on the pull request: https://github.com/apache/nifi/pull/354#issuecomment-211703487 @YolandaMDavis this looks very good. One general comment; Just to finish the life-cycle loop I'd suggest adding a stop operation (operation annotated with @OnStopped) where based on what I see you would simply set transformer to null, unless there is some stop/cleanup procedure required (e.g., close something etc.).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user olegz commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/354#discussion_r60167411

          — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformJSON.java —
          @@ -0,0 +1,211 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.processors.standard;
          +
          +
          +import java.io.IOException;
          +import java.io.InputStream;
          +import java.io.OutputStream;
          +import java.util.ArrayList;
          +import java.util.Collections;
          +import java.util.HashSet;
          +import java.util.List;
          +import java.util.Set;
          +import java.util.concurrent.TimeUnit;
          +
          +import org.apache.nifi.annotation.behavior.EventDriven;
          +import org.apache.nifi.annotation.behavior.InputRequirement;
          +import org.apache.nifi.annotation.behavior.SideEffectFree;
          +import org.apache.nifi.annotation.behavior.SupportsBatching;
          +import org.apache.nifi.annotation.documentation.CapabilityDescription;
          +import org.apache.nifi.annotation.documentation.Tags;
          +import org.apache.nifi.annotation.lifecycle.OnScheduled;
          +import org.apache.nifi.components.AllowableValue;
          +import org.apache.nifi.components.PropertyDescriptor;
          +import org.apache.nifi.components.ValidationContext;
          +import org.apache.nifi.components.ValidationResult;
          +import org.apache.nifi.components.Validator;
          +import org.apache.nifi.flowfile.FlowFile;
          +import org.apache.nifi.logging.ProcessorLog;
          +import org.apache.nifi.processor.AbstractProcessor;
          +import org.apache.nifi.processor.ProcessContext;
          +import org.apache.nifi.processor.ProcessSession;
          +import org.apache.nifi.processor.Relationship;
          +import org.apache.nifi.processor.exception.ProcessException;
          +import org.apache.nifi.processor.io.StreamCallback;
          +import org.apache.nifi.processor.util.StandardValidators;
          +import org.apache.nifi.stream.io.BufferedInputStream;
          +import org.apache.nifi.util.StopWatch;
          +
          +import com.bazaarvoice.jolt.Shiftr;
          +import com.bazaarvoice.jolt.Removr;
          +import com.bazaarvoice.jolt.Chainr;
          +import com.bazaarvoice.jolt.Defaultr;
          +import com.bazaarvoice.jolt.Transform;
          +import com.bazaarvoice.jolt.JsonUtils;
          +
          +@EventDriven
          +@SideEffectFree
          +@SupportsBatching
          +@Tags(

          {"json", "jolt", "transform", "shiftr", "chainr", "defaultr", "removr"}

          )
          +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
          +@CapabilityDescription("Applies a list of JOLT specifications to the flowfile JSON payload. A new FlowFile is created "
          + + "with transformed content and is routed to the 'success' relationship. If the JSON transform "
          + + "fails, the original FlowFile is routed to the 'failure' relationship")
          +public class TransformJSON extends AbstractProcessor {
          +
          + public static final AllowableValue SHIFTR = new AllowableValue("Shift", "Shift Transform DSL", "This JOLT transformation will shift input JSON/data to create the output JSON/data.");
          + public static final AllowableValue CHAINR = new AllowableValue("Chain", "Chain Transform DSL", "Execute list of JOLT transformations.");
          + public static final AllowableValue DEFAULTR = new AllowableValue("Default", "Default Transform DSL", " This JOLT transformation will apply default values to the output JSON/data.");
          + public static final AllowableValue REMOVR = new AllowableValue("Remove", "Remove Transform DSL", " This JOLT transformation will apply default values to the output JSON/data.");
          +
          + public static final PropertyDescriptor JOLT_SPEC = new PropertyDescriptor.Builder()
          + .name("Jolt Specification")
          — End diff –

          Something @alopresto pointed out on another unrelated PR, yet a valid point and we need to start making it as a rule and that is using name and displayName. In fact I'll quote Andy's comment:
          ".name should be a machine-safe string (i.e. ssl-context-service) which will remain constant over the life of the processor because it is used for object resolution when loading from the flow.tar.gz file. For the human-readable value to display in the UI, please use .displayName. This can be changed without affecting object resolution (for future renaming, internationalization, etc.)."

          So, you can name the above property "jolt-spec" or something like that, yet keep displayName "Jolt Specification". The benefit if at some point we find a more user friendly name we'll be able to change it at will, which is not something we'll be able to do with the current approach.

          Show
          githubbot ASF GitHub Bot added a comment - Github user olegz commented on a diff in the pull request: https://github.com/apache/nifi/pull/354#discussion_r60167411 — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformJSON.java — @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.BufferedInputStream; +import org.apache.nifi.util.StopWatch; + +import com.bazaarvoice.jolt.Shiftr; +import com.bazaarvoice.jolt.Removr; +import com.bazaarvoice.jolt.Chainr; +import com.bazaarvoice.jolt.Defaultr; +import com.bazaarvoice.jolt.Transform; +import com.bazaarvoice.jolt.JsonUtils; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags( {"json", "jolt", "transform", "shiftr", "chainr", "defaultr", "removr"} ) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Applies a list of JOLT specifications to the flowfile JSON payload. A new FlowFile is created " + + "with transformed content and is routed to the 'success' relationship. If the JSON transform " + + "fails, the original FlowFile is routed to the 'failure' relationship") +public class TransformJSON extends AbstractProcessor { + + public static final AllowableValue SHIFTR = new AllowableValue("Shift", "Shift Transform DSL", "This JOLT transformation will shift input JSON/data to create the output JSON/data."); + public static final AllowableValue CHAINR = new AllowableValue("Chain", "Chain Transform DSL", "Execute list of JOLT transformations."); + public static final AllowableValue DEFAULTR = new AllowableValue("Default", "Default Transform DSL", " This JOLT transformation will apply default values to the output JSON/data."); + public static final AllowableValue REMOVR = new AllowableValue("Remove", "Remove Transform DSL", " This JOLT transformation will apply default values to the output JSON/data."); + + public static final PropertyDescriptor JOLT_SPEC = new PropertyDescriptor.Builder() + .name("Jolt Specification") — End diff – Something @alopresto pointed out on another unrelated PR, yet a valid point and we need to start making it as a rule and that is using name and displayName . In fact I'll quote Andy's comment: ".name should be a machine-safe string (i.e. ssl-context-service) which will remain constant over the life of the processor because it is used for object resolution when loading from the flow.tar.gz file. For the human-readable value to display in the UI, please use .displayName. This can be changed without affecting object resolution (for future renaming, internationalization, etc.)." So, you can name the above property "jolt-spec" or something like that, yet keep displayName "Jolt Specification". The benefit if at some point we find a more user friendly name we'll be able to change it at will, which is not something we'll be able to do with the current approach.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user olegz commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/354#discussion_r60167981

          — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformJSON.java —
          @@ -0,0 +1,211 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.processors.standard;
          +
          +
          +import java.io.IOException;
          +import java.io.InputStream;
          +import java.io.OutputStream;
          +import java.util.ArrayList;
          +import java.util.Collections;
          +import java.util.HashSet;
          +import java.util.List;
          +import java.util.Set;
          +import java.util.concurrent.TimeUnit;
          +
          +import org.apache.nifi.annotation.behavior.EventDriven;
          +import org.apache.nifi.annotation.behavior.InputRequirement;
          +import org.apache.nifi.annotation.behavior.SideEffectFree;
          +import org.apache.nifi.annotation.behavior.SupportsBatching;
          +import org.apache.nifi.annotation.documentation.CapabilityDescription;
          +import org.apache.nifi.annotation.documentation.Tags;
          +import org.apache.nifi.annotation.lifecycle.OnScheduled;
          +import org.apache.nifi.components.AllowableValue;
          +import org.apache.nifi.components.PropertyDescriptor;
          +import org.apache.nifi.components.ValidationContext;
          +import org.apache.nifi.components.ValidationResult;
          +import org.apache.nifi.components.Validator;
          +import org.apache.nifi.flowfile.FlowFile;
          +import org.apache.nifi.logging.ProcessorLog;
          +import org.apache.nifi.processor.AbstractProcessor;
          +import org.apache.nifi.processor.ProcessContext;
          +import org.apache.nifi.processor.ProcessSession;
          +import org.apache.nifi.processor.Relationship;
          +import org.apache.nifi.processor.exception.ProcessException;
          +import org.apache.nifi.processor.io.StreamCallback;
          +import org.apache.nifi.processor.util.StandardValidators;
          +import org.apache.nifi.stream.io.BufferedInputStream;
          +import org.apache.nifi.util.StopWatch;
          +
          +import com.bazaarvoice.jolt.Shiftr;
          +import com.bazaarvoice.jolt.Removr;
          +import com.bazaarvoice.jolt.Chainr;
          +import com.bazaarvoice.jolt.Defaultr;
          +import com.bazaarvoice.jolt.Transform;
          +import com.bazaarvoice.jolt.JsonUtils;
          +
          +@EventDriven
          +@SideEffectFree
          +@SupportsBatching
          +@Tags(

          {"json", "jolt", "transform", "shiftr", "chainr", "defaultr", "removr"}

          )
          +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
          +@CapabilityDescription("Applies a list of JOLT specifications to the flowfile JSON payload. A new FlowFile is created "
          + + "with transformed content and is routed to the 'success' relationship. If the JSON transform "
          + + "fails, the original FlowFile is routed to the 'failure' relationship")
          +public class TransformJSON extends AbstractProcessor {
          +
          + public static final AllowableValue SHIFTR = new AllowableValue("Shift", "Shift Transform DSL", "This JOLT transformation will shift input JSON/data to create the output JSON/data.");
          + public static final AllowableValue CHAINR = new AllowableValue("Chain", "Chain Transform DSL", "Execute list of JOLT transformations.");
          + public static final AllowableValue DEFAULTR = new AllowableValue("Default", "Default Transform DSL", " This JOLT transformation will apply default values to the output JSON/data.");
          + public static final AllowableValue REMOVR = new AllowableValue("Remove", "Remove Transform DSL", " This JOLT transformation will apply default values to the output JSON/data.");
          +
          + public static final PropertyDescriptor JOLT_SPEC = new PropertyDescriptor.Builder()
          + .name("Jolt Specification")
          + .description("Jolt Specification for transform of JSON data.")
          + .required(true)
          + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
          + .addValidator(new JOLTSpecValidator())
          + .required(true)
          + .build();
          — End diff –

          Looking at the tests I am now wondering if we should also support pointing to a file which contains spec. If json spec is simple it may be OK to just type it in, but even in your tests some are rather complex and it may not be very convenient for the user to just type it in. Also, the user may not be technically skilled to do that and may rely on the library of specs files. What do you think?

          Show
          githubbot ASF GitHub Bot added a comment - Github user olegz commented on a diff in the pull request: https://github.com/apache/nifi/pull/354#discussion_r60167981 — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformJSON.java — @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.BufferedInputStream; +import org.apache.nifi.util.StopWatch; + +import com.bazaarvoice.jolt.Shiftr; +import com.bazaarvoice.jolt.Removr; +import com.bazaarvoice.jolt.Chainr; +import com.bazaarvoice.jolt.Defaultr; +import com.bazaarvoice.jolt.Transform; +import com.bazaarvoice.jolt.JsonUtils; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags( {"json", "jolt", "transform", "shiftr", "chainr", "defaultr", "removr"} ) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Applies a list of JOLT specifications to the flowfile JSON payload. A new FlowFile is created " + + "with transformed content and is routed to the 'success' relationship. If the JSON transform " + + "fails, the original FlowFile is routed to the 'failure' relationship") +public class TransformJSON extends AbstractProcessor { + + public static final AllowableValue SHIFTR = new AllowableValue("Shift", "Shift Transform DSL", "This JOLT transformation will shift input JSON/data to create the output JSON/data."); + public static final AllowableValue CHAINR = new AllowableValue("Chain", "Chain Transform DSL", "Execute list of JOLT transformations."); + public static final AllowableValue DEFAULTR = new AllowableValue("Default", "Default Transform DSL", " This JOLT transformation will apply default values to the output JSON/data."); + public static final AllowableValue REMOVR = new AllowableValue("Remove", "Remove Transform DSL", " This JOLT transformation will apply default values to the output JSON/data."); + + public static final PropertyDescriptor JOLT_SPEC = new PropertyDescriptor.Builder() + .name("Jolt Specification") + .description("Jolt Specification for transform of JSON data.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .addValidator(new JOLTSpecValidator()) + .required(true) + .build(); — End diff – Looking at the tests I am now wondering if we should also support pointing to a file which contains spec. If json spec is simple it may be OK to just type it in, but even in your tests some are rather complex and it may not be very convenient for the user to just type it in. Also, the user may not be technically skilled to do that and may rely on the library of specs files. What do you think?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user joewitt commented on the pull request:

          https://github.com/apache/nifi/pull/354#issuecomment-211708711

          @YolandaMDavis @olegz some feedback.

          1. The com.bazaar.jolt... dependency does not appear to be accounted for in the LICENSE/NOTICE updates. Their LICENSE does indicate they would like reference made to 'Copyright 2013-2014 Bazaarvoice, Inc.' This is found in https://github.com/bazaarvoice/jolt/blob/1271e1919693e63ce1efdf534dfee0e0579d0b2f/LICENSE So we should propogate that into the NOTICE file as well. Also we need to reference JOLT in our top level nifi-assembly/NOTICE as well if it is new and I'd assume at least for Bazaar it would be.

          2. I don't believe I follow the recommendation to change the member variable 'transform' to volatile or to set it to null on stopped. The variable appears to be set at the proper lifecycle point in NiFi and thus would not be reassigned in some non-thread safe way. The only question then is could its use itself be non Thread safe and that does appear questionable to me. In that case marking the variable as volatile won't help at all anyway. If indeed it is not thread safe you will want to consider marking the class as executing serially or you could have a pool of transforms or you could transform batches at a time. Lots of ways to tackle it. Of course if that is thread safe already then you are fine and in any event the volatile should be unnecessary. Finally, I am not aware of any cause for setting it to null during onStopped or on unScheduled as if the processor was being removed from the flow then it would get cleaned up anyway. If there was a need to call a close method or something then yes that is good.

          Show
          githubbot ASF GitHub Bot added a comment - Github user joewitt commented on the pull request: https://github.com/apache/nifi/pull/354#issuecomment-211708711 @YolandaMDavis @olegz some feedback. 1. The com.bazaar.jolt... dependency does not appear to be accounted for in the LICENSE/NOTICE updates. Their LICENSE does indicate they would like reference made to 'Copyright 2013-2014 Bazaarvoice, Inc.' This is found in https://github.com/bazaarvoice/jolt/blob/1271e1919693e63ce1efdf534dfee0e0579d0b2f/LICENSE So we should propogate that into the NOTICE file as well. Also we need to reference JOLT in our top level nifi-assembly/NOTICE as well if it is new and I'd assume at least for Bazaar it would be. 2. I don't believe I follow the recommendation to change the member variable 'transform' to volatile or to set it to null on stopped. The variable appears to be set at the proper lifecycle point in NiFi and thus would not be reassigned in some non-thread safe way. The only question then is could its use itself be non Thread safe and that does appear questionable to me. In that case marking the variable as volatile won't help at all anyway. If indeed it is not thread safe you will want to consider marking the class as executing serially or you could have a pool of transforms or you could transform batches at a time. Lots of ways to tackle it. Of course if that is thread safe already then you are fine and in any event the volatile should be unnecessary. Finally, I am not aware of any cause for setting it to null during onStopped or on unScheduled as if the processor was being removed from the flow then it would get cleaned up anyway. If there was a need to call a close method or something then yes that is good.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user olegz commented on the pull request:

          https://github.com/apache/nifi/pull/354#issuecomment-211718256

          @joewitt As I am thinking more about it, you have a point for @OnStopped, so I am retracting my comment. As for volatile, what's at question is the visibility of the transformer by another thread. Basically if T1 executed @OnSchedule where it is set to a value, T2 may come in and it will still be null, not because of some race condition, but simply because the value set by T1 may not be visible by T2.

          Show
          githubbot ASF GitHub Bot added a comment - Github user olegz commented on the pull request: https://github.com/apache/nifi/pull/354#issuecomment-211718256 @joewitt As I am thinking more about it, you have a point for @OnStopped, so I am retracting my comment. As for volatile, what's at question is the visibility of the transformer by another thread. Basically if T1 executed @OnSchedule where it is set to a value, T2 may come in and it will still be null, not because of some race condition, but simply because the value set by T1 may not be visible by T2.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user joewitt commented on the pull request:

          https://github.com/apache/nifi/pull/354#issuecomment-211724253

          Oleg off-list pointed out the finer concerns with the member variable. While it seems pretty far out it also seems like it is accurate. In reading https://docs.oracle.com/javase/tutorial/essential/concurrency/atomic.html and https://docs.oracle.com/javase/tutorial/essential/concurrency/memconsist.html

          And indeed I cannot think of anything the framework is doing with thread management or how those lifecycle calls work that would enforce/guarantee a happens-before relationship for these processor member variables. @markap14 can you?

          That said the licensing and thread safety issue of the transform itself are still needing to be addressed.

          Show
          githubbot ASF GitHub Bot added a comment - Github user joewitt commented on the pull request: https://github.com/apache/nifi/pull/354#issuecomment-211724253 Oleg off-list pointed out the finer concerns with the member variable. While it seems pretty far out it also seems like it is accurate. In reading https://docs.oracle.com/javase/tutorial/essential/concurrency/atomic.html and https://docs.oracle.com/javase/tutorial/essential/concurrency/memconsist.html And indeed I cannot think of anything the framework is doing with thread management or how those lifecycle calls work that would enforce/guarantee a happens-before relationship for these processor member variables. @markap14 can you? That said the licensing and thread safety issue of the transform itself are still needing to be addressed.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user joewitt commented on the pull request:

          https://github.com/apache/nifi/pull/354#issuecomment-211726075

          I am less concerned about thread safety of the transforms themselves now. Jolt's docs do refer to this at the readme level https://github.com/bazaarvoice/jolt/tree/1271e1919693e63ce1efdf534dfee0e0579d0b2f#-performance.

          So that leaves the update to NOTICE(s) concern

          Show
          githubbot ASF GitHub Bot added a comment - Github user joewitt commented on the pull request: https://github.com/apache/nifi/pull/354#issuecomment-211726075 I am less concerned about thread safety of the transforms themselves now. Jolt's docs do refer to this at the readme level https://github.com/bazaarvoice/jolt/tree/1271e1919693e63ce1efdf534dfee0e0579d0b2f#-performance . So that leaves the update to NOTICE(s) concern
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user davetorok commented on the pull request:

          https://github.com/apache/nifi/pull/354#issuecomment-211909411

          We've been working with JOLT for awhile within NiFi and appreciate having it supported as a standard processor within NiFi! I have some suggestions.

          1. I would change the PR to support all the built-in transformations including 'sort' and 'cardinality' which are not in the pull request.

          2. 90% of what we've seen uses ChainR as that is a composition of 1 or more individual transforms, almost always a "ShiftR + DefaultR". In fact it is a little confusing to support options other than just "Chainr" since ChainR takes an array of individual transform Specs whereas the others are individual specs. At minimum I would make CHAINR the default.

          3. Jolt supports additional custom transforms via fully-qualified Java classnames. We use a couple. I would recommend adding documentation on how to support it via a drop-in jar similar to how ExecuteScript supports additional resources. Example custom transform is here https://github.com/zacheusz/jolt-date

          Show
          githubbot ASF GitHub Bot added a comment - Github user davetorok commented on the pull request: https://github.com/apache/nifi/pull/354#issuecomment-211909411 We've been working with JOLT for awhile within NiFi and appreciate having it supported as a standard processor within NiFi! I have some suggestions. 1. I would change the PR to support all the built-in transformations including 'sort' and 'cardinality' which are not in the pull request. 2. 90% of what we've seen uses ChainR as that is a composition of 1 or more individual transforms, almost always a "ShiftR + DefaultR". In fact it is a little confusing to support options other than just "Chainr" since ChainR takes an array of individual transform Specs whereas the others are individual specs. At minimum I would make CHAINR the default. 3. Jolt supports additional custom transforms via fully-qualified Java classnames. We use a couple. I would recommend adding documentation on how to support it via a drop-in jar similar to how ExecuteScript supports additional resources. Example custom transform is here https://github.com/zacheusz/jolt-date
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user davetorok commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/354#discussion_r60224390

          — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformJSON.java —
          @@ -0,0 +1,211 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.processors.standard;
          +
          +
          +import java.io.IOException;
          +import java.io.InputStream;
          +import java.io.OutputStream;
          +import java.util.ArrayList;
          +import java.util.Collections;
          +import java.util.HashSet;
          +import java.util.List;
          +import java.util.Set;
          +import java.util.concurrent.TimeUnit;
          +
          +import org.apache.nifi.annotation.behavior.EventDriven;
          +import org.apache.nifi.annotation.behavior.InputRequirement;
          +import org.apache.nifi.annotation.behavior.SideEffectFree;
          +import org.apache.nifi.annotation.behavior.SupportsBatching;
          +import org.apache.nifi.annotation.documentation.CapabilityDescription;
          +import org.apache.nifi.annotation.documentation.Tags;
          +import org.apache.nifi.annotation.lifecycle.OnScheduled;
          +import org.apache.nifi.components.AllowableValue;
          +import org.apache.nifi.components.PropertyDescriptor;
          +import org.apache.nifi.components.ValidationContext;
          +import org.apache.nifi.components.ValidationResult;
          +import org.apache.nifi.components.Validator;
          +import org.apache.nifi.flowfile.FlowFile;
          +import org.apache.nifi.logging.ProcessorLog;
          +import org.apache.nifi.processor.AbstractProcessor;
          +import org.apache.nifi.processor.ProcessContext;
          +import org.apache.nifi.processor.ProcessSession;
          +import org.apache.nifi.processor.Relationship;
          +import org.apache.nifi.processor.exception.ProcessException;
          +import org.apache.nifi.processor.io.StreamCallback;
          +import org.apache.nifi.processor.util.StandardValidators;
          +import org.apache.nifi.stream.io.BufferedInputStream;
          +import org.apache.nifi.util.StopWatch;
          +
          +import com.bazaarvoice.jolt.Shiftr;
          +import com.bazaarvoice.jolt.Removr;
          +import com.bazaarvoice.jolt.Chainr;
          +import com.bazaarvoice.jolt.Defaultr;
          +import com.bazaarvoice.jolt.Transform;
          +import com.bazaarvoice.jolt.JsonUtils;
          +
          +@EventDriven
          +@SideEffectFree
          +@SupportsBatching
          +@Tags(

          {"json", "jolt", "transform", "shiftr", "chainr", "defaultr", "removr"}

          )
          +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
          +@CapabilityDescription("Applies a list of JOLT specifications to the flowfile JSON payload. A new FlowFile is created "
          + + "with transformed content and is routed to the 'success' relationship. If the JSON transform "
          + + "fails, the original FlowFile is routed to the 'failure' relationship")
          +public class TransformJSON extends AbstractProcessor {
          +
          + public static final AllowableValue SHIFTR = new AllowableValue("Shift", "Shift Transform DSL", "This JOLT transformation will shift input JSON/data to create the output JSON/data.");
          + public static final AllowableValue CHAINR = new AllowableValue("Chain", "Chain Transform DSL", "Execute list of JOLT transformations.");
          + public static final AllowableValue DEFAULTR = new AllowableValue("Default", "Default Transform DSL", " This JOLT transformation will apply default values to the output JSON/data.");
          + public static final AllowableValue REMOVR = new AllowableValue("Remove", "Remove Transform DSL", " This JOLT transformation will apply default values to the output JSON/data.");
          +
          + public static final PropertyDescriptor JOLT_SPEC = new PropertyDescriptor.Builder()
          + .name("Jolt Specification")
          + .description("Jolt Specification for transform of JSON data.")
          + .required(true)
          + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
          + .addValidator(new JOLTSpecValidator())
          + .required(true)
          + .build();
          +
          + public static final PropertyDescriptor JOLT_TRANSFORM = new PropertyDescriptor.Builder()
          + .name("Jolt Transformation")
          + .description("Specifies the Jolt Transformation that should be used with the provided specification.")
          + .required(true)
          + .allowableValues(SHIFTR, CHAINR, DEFAULTR, REMOVR)
          + .defaultValue(CHAINR.getValue())
          + .build();
          +
          + public static final Relationship REL_SUCCESS = new Relationship.Builder()
          + .name("success")
          + .description("The FlowFile with transformed content will be routed to this relationship")
          + .build();
          + public static final Relationship REL_FAILURE = new Relationship.Builder()
          + .name("failure")
          + .description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid JSON), it will be routed to this relationship")
          + .build();
          +
          + private final static List<PropertyDescriptor> properties;
          + private final static Set<Relationship> relationships;
          + private volatile Transform transform;
          +
          + static

          { + + final List<PropertyDescriptor> _properties = new ArrayList<>(); + _properties.add(JOLT_TRANSFORM); + _properties.add(JOLT_SPEC); + properties = Collections.unmodifiableList(_properties); + + final Set<Relationship> _relationships = new HashSet<>(); + _relationships.add(REL_SUCCESS); + _relationships.add(REL_FAILURE); + relationships = Collections.unmodifiableSet(_relationships); + + }

          +
          + @Override
          + public Set<Relationship> getRelationships()

          { + return relationships; + }

          +
          + @Override
          + protected List<PropertyDescriptor> getSupportedPropertyDescriptors()

          { + return properties; + }

          +
          + @Override
          + public void onTrigger(final ProcessContext context, ProcessSession session) throws ProcessException {
          +
          + final FlowFile original = session.get();
          + if (original == null)

          { + return; + }

          +
          + final ProcessorLog logger = getLogger();
          + final StopWatch stopWatch = new StopWatch(true);
          +
          + try {
          +
          + FlowFile transformed = session.write(original, new StreamCallback() {
          + @Override
          + public void process(final InputStream rawIn, final OutputStream out) throws IOException {
          +
          + try (final InputStream in = new BufferedInputStream(rawIn)) {
          + Object inputJson = JsonUtils.jsonToObject(in);
          + Object transformedJson = transform.transform(inputJson);
          + out.write(JsonUtils.toJsonString(transformedJson).getBytes());
          — End diff –

          We ran into a bug with this very line! We fixed it with the following, but probably needs to be flexible and take the charset from the input or via a property:

          `out.write(JsonUtils.toJsonString(transformedJson).getBytes(StandardCharsets.UTF_8));`

          Show
          githubbot ASF GitHub Bot added a comment - Github user davetorok commented on a diff in the pull request: https://github.com/apache/nifi/pull/354#discussion_r60224390 — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformJSON.java — @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.BufferedInputStream; +import org.apache.nifi.util.StopWatch; + +import com.bazaarvoice.jolt.Shiftr; +import com.bazaarvoice.jolt.Removr; +import com.bazaarvoice.jolt.Chainr; +import com.bazaarvoice.jolt.Defaultr; +import com.bazaarvoice.jolt.Transform; +import com.bazaarvoice.jolt.JsonUtils; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags( {"json", "jolt", "transform", "shiftr", "chainr", "defaultr", "removr"} ) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Applies a list of JOLT specifications to the flowfile JSON payload. A new FlowFile is created " + + "with transformed content and is routed to the 'success' relationship. If the JSON transform " + + "fails, the original FlowFile is routed to the 'failure' relationship") +public class TransformJSON extends AbstractProcessor { + + public static final AllowableValue SHIFTR = new AllowableValue("Shift", "Shift Transform DSL", "This JOLT transformation will shift input JSON/data to create the output JSON/data."); + public static final AllowableValue CHAINR = new AllowableValue("Chain", "Chain Transform DSL", "Execute list of JOLT transformations."); + public static final AllowableValue DEFAULTR = new AllowableValue("Default", "Default Transform DSL", " This JOLT transformation will apply default values to the output JSON/data."); + public static final AllowableValue REMOVR = new AllowableValue("Remove", "Remove Transform DSL", " This JOLT transformation will apply default values to the output JSON/data."); + + public static final PropertyDescriptor JOLT_SPEC = new PropertyDescriptor.Builder() + .name("Jolt Specification") + .description("Jolt Specification for transform of JSON data.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .addValidator(new JOLTSpecValidator()) + .required(true) + .build(); + + public static final PropertyDescriptor JOLT_TRANSFORM = new PropertyDescriptor.Builder() + .name("Jolt Transformation") + .description("Specifies the Jolt Transformation that should be used with the provided specification.") + .required(true) + .allowableValues(SHIFTR, CHAINR, DEFAULTR, REMOVR) + .defaultValue(CHAINR.getValue()) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("The FlowFile with transformed content will be routed to this relationship") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid JSON), it will be routed to this relationship") + .build(); + + private final static List<PropertyDescriptor> properties; + private final static Set<Relationship> relationships; + private volatile Transform transform; + + static { + + final List<PropertyDescriptor> _properties = new ArrayList<>(); + _properties.add(JOLT_TRANSFORM); + _properties.add(JOLT_SPEC); + properties = Collections.unmodifiableList(_properties); + + final Set<Relationship> _relationships = new HashSet<>(); + _relationships.add(REL_SUCCESS); + _relationships.add(REL_FAILURE); + relationships = Collections.unmodifiableSet(_relationships); + + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public void onTrigger(final ProcessContext context, ProcessSession session) throws ProcessException { + + final FlowFile original = session.get(); + if (original == null) { + return; + } + + final ProcessorLog logger = getLogger(); + final StopWatch stopWatch = new StopWatch(true); + + try { + + FlowFile transformed = session.write(original, new StreamCallback() { + @Override + public void process(final InputStream rawIn, final OutputStream out) throws IOException { + + try (final InputStream in = new BufferedInputStream(rawIn)) { + Object inputJson = JsonUtils.jsonToObject(in); + Object transformedJson = transform.transform(inputJson); + out.write(JsonUtils.toJsonString(transformedJson).getBytes()); — End diff – We ran into a bug with this very line! We fixed it with the following, but probably needs to be flexible and take the charset from the input or via a property: `out.write(JsonUtils.toJsonString(transformedJson).getBytes(StandardCharsets.UTF_8));`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user YolandaMDavis commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/354#discussion_r60297981

          — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformJSON.java —
          @@ -0,0 +1,211 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.processors.standard;
          +
          +
          +import java.io.IOException;
          +import java.io.InputStream;
          +import java.io.OutputStream;
          +import java.util.ArrayList;
          +import java.util.Collections;
          +import java.util.HashSet;
          +import java.util.List;
          +import java.util.Set;
          +import java.util.concurrent.TimeUnit;
          +
          +import org.apache.nifi.annotation.behavior.EventDriven;
          +import org.apache.nifi.annotation.behavior.InputRequirement;
          +import org.apache.nifi.annotation.behavior.SideEffectFree;
          +import org.apache.nifi.annotation.behavior.SupportsBatching;
          +import org.apache.nifi.annotation.documentation.CapabilityDescription;
          +import org.apache.nifi.annotation.documentation.Tags;
          +import org.apache.nifi.annotation.lifecycle.OnScheduled;
          +import org.apache.nifi.components.AllowableValue;
          +import org.apache.nifi.components.PropertyDescriptor;
          +import org.apache.nifi.components.ValidationContext;
          +import org.apache.nifi.components.ValidationResult;
          +import org.apache.nifi.components.Validator;
          +import org.apache.nifi.flowfile.FlowFile;
          +import org.apache.nifi.logging.ProcessorLog;
          +import org.apache.nifi.processor.AbstractProcessor;
          +import org.apache.nifi.processor.ProcessContext;
          +import org.apache.nifi.processor.ProcessSession;
          +import org.apache.nifi.processor.Relationship;
          +import org.apache.nifi.processor.exception.ProcessException;
          +import org.apache.nifi.processor.io.StreamCallback;
          +import org.apache.nifi.processor.util.StandardValidators;
          +import org.apache.nifi.stream.io.BufferedInputStream;
          +import org.apache.nifi.util.StopWatch;
          +
          +import com.bazaarvoice.jolt.Shiftr;
          +import com.bazaarvoice.jolt.Removr;
          +import com.bazaarvoice.jolt.Chainr;
          +import com.bazaarvoice.jolt.Defaultr;
          +import com.bazaarvoice.jolt.Transform;
          +import com.bazaarvoice.jolt.JsonUtils;
          +
          +@EventDriven
          +@SideEffectFree
          +@SupportsBatching
          +@Tags(

          {"json", "jolt", "transform", "shiftr", "chainr", "defaultr", "removr"}

          )
          +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
          +@CapabilityDescription("Applies a list of JOLT specifications to the flowfile JSON payload. A new FlowFile is created "
          + + "with transformed content and is routed to the 'success' relationship. If the JSON transform "
          + + "fails, the original FlowFile is routed to the 'failure' relationship")
          +public class TransformJSON extends AbstractProcessor {
          +
          + public static final AllowableValue SHIFTR = new AllowableValue("Shift", "Shift Transform DSL", "This JOLT transformation will shift input JSON/data to create the output JSON/data.");
          + public static final AllowableValue CHAINR = new AllowableValue("Chain", "Chain Transform DSL", "Execute list of JOLT transformations.");
          + public static final AllowableValue DEFAULTR = new AllowableValue("Default", "Default Transform DSL", " This JOLT transformation will apply default values to the output JSON/data.");
          + public static final AllowableValue REMOVR = new AllowableValue("Remove", "Remove Transform DSL", " This JOLT transformation will apply default values to the output JSON/data.");
          +
          + public static final PropertyDescriptor JOLT_SPEC = new PropertyDescriptor.Builder()
          + .name("Jolt Specification")
          + .description("Jolt Specification for transform of JSON data.")
          + .required(true)
          + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
          + .addValidator(new JOLTSpecValidator())
          + .required(true)
          + .build();
          — End diff –

          Concerning pointing to a file I think that would be good on the single node model, however could be more complicated in the cluster model (needing to ensure same file available to all nodes processing the flow). Also I plan on working through a UI change, specific to this processor at a later time, that I think will be helpful to users with complex specs. One feature of that UI could include the ability to locally point to the spec file and import into the UI, which would then allow a user to edit if that like or simply save. From there the imported spec could be be saved as a property as it is currently.

          Show
          githubbot ASF GitHub Bot added a comment - Github user YolandaMDavis commented on a diff in the pull request: https://github.com/apache/nifi/pull/354#discussion_r60297981 — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformJSON.java — @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.BufferedInputStream; +import org.apache.nifi.util.StopWatch; + +import com.bazaarvoice.jolt.Shiftr; +import com.bazaarvoice.jolt.Removr; +import com.bazaarvoice.jolt.Chainr; +import com.bazaarvoice.jolt.Defaultr; +import com.bazaarvoice.jolt.Transform; +import com.bazaarvoice.jolt.JsonUtils; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags( {"json", "jolt", "transform", "shiftr", "chainr", "defaultr", "removr"} ) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Applies a list of JOLT specifications to the flowfile JSON payload. A new FlowFile is created " + + "with transformed content and is routed to the 'success' relationship. If the JSON transform " + + "fails, the original FlowFile is routed to the 'failure' relationship") +public class TransformJSON extends AbstractProcessor { + + public static final AllowableValue SHIFTR = new AllowableValue("Shift", "Shift Transform DSL", "This JOLT transformation will shift input JSON/data to create the output JSON/data."); + public static final AllowableValue CHAINR = new AllowableValue("Chain", "Chain Transform DSL", "Execute list of JOLT transformations."); + public static final AllowableValue DEFAULTR = new AllowableValue("Default", "Default Transform DSL", " This JOLT transformation will apply default values to the output JSON/data."); + public static final AllowableValue REMOVR = new AllowableValue("Remove", "Remove Transform DSL", " This JOLT transformation will apply default values to the output JSON/data."); + + public static final PropertyDescriptor JOLT_SPEC = new PropertyDescriptor.Builder() + .name("Jolt Specification") + .description("Jolt Specification for transform of JSON data.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .addValidator(new JOLTSpecValidator()) + .required(true) + .build(); — End diff – Concerning pointing to a file I think that would be good on the single node model, however could be more complicated in the cluster model (needing to ensure same file available to all nodes processing the flow). Also I plan on working through a UI change, specific to this processor at a later time, that I think will be helpful to users with complex specs. One feature of that UI could include the ability to locally point to the spec file and import into the UI, which would then allow a user to edit if that like or simply save. From there the imported spec could be be saved as a property as it is currently.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user YolandaMDavis commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/354#discussion_r60302366

          — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformJSON.java —
          @@ -0,0 +1,211 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.processors.standard;
          +
          +
          +import java.io.IOException;
          +import java.io.InputStream;
          +import java.io.OutputStream;
          +import java.util.ArrayList;
          +import java.util.Collections;
          +import java.util.HashSet;
          +import java.util.List;
          +import java.util.Set;
          +import java.util.concurrent.TimeUnit;
          +
          +import org.apache.nifi.annotation.behavior.EventDriven;
          +import org.apache.nifi.annotation.behavior.InputRequirement;
          +import org.apache.nifi.annotation.behavior.SideEffectFree;
          +import org.apache.nifi.annotation.behavior.SupportsBatching;
          +import org.apache.nifi.annotation.documentation.CapabilityDescription;
          +import org.apache.nifi.annotation.documentation.Tags;
          +import org.apache.nifi.annotation.lifecycle.OnScheduled;
          +import org.apache.nifi.components.AllowableValue;
          +import org.apache.nifi.components.PropertyDescriptor;
          +import org.apache.nifi.components.ValidationContext;
          +import org.apache.nifi.components.ValidationResult;
          +import org.apache.nifi.components.Validator;
          +import org.apache.nifi.flowfile.FlowFile;
          +import org.apache.nifi.logging.ProcessorLog;
          +import org.apache.nifi.processor.AbstractProcessor;
          +import org.apache.nifi.processor.ProcessContext;
          +import org.apache.nifi.processor.ProcessSession;
          +import org.apache.nifi.processor.Relationship;
          +import org.apache.nifi.processor.exception.ProcessException;
          +import org.apache.nifi.processor.io.StreamCallback;
          +import org.apache.nifi.processor.util.StandardValidators;
          +import org.apache.nifi.stream.io.BufferedInputStream;
          +import org.apache.nifi.util.StopWatch;
          +
          +import com.bazaarvoice.jolt.Shiftr;
          +import com.bazaarvoice.jolt.Removr;
          +import com.bazaarvoice.jolt.Chainr;
          +import com.bazaarvoice.jolt.Defaultr;
          +import com.bazaarvoice.jolt.Transform;
          +import com.bazaarvoice.jolt.JsonUtils;
          +
          +@EventDriven
          +@SideEffectFree
          +@SupportsBatching
          +@Tags(

          {"json", "jolt", "transform", "shiftr", "chainr", "defaultr", "removr"}

          )
          +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
          +@CapabilityDescription("Applies a list of JOLT specifications to the flowfile JSON payload. A new FlowFile is created "
          + + "with transformed content and is routed to the 'success' relationship. If the JSON transform "
          + + "fails, the original FlowFile is routed to the 'failure' relationship")
          +public class TransformJSON extends AbstractProcessor {
          +
          + public static final AllowableValue SHIFTR = new AllowableValue("Shift", "Shift Transform DSL", "This JOLT transformation will shift input JSON/data to create the output JSON/data.");
          + public static final AllowableValue CHAINR = new AllowableValue("Chain", "Chain Transform DSL", "Execute list of JOLT transformations.");
          + public static final AllowableValue DEFAULTR = new AllowableValue("Default", "Default Transform DSL", " This JOLT transformation will apply default values to the output JSON/data.");
          + public static final AllowableValue REMOVR = new AllowableValue("Remove", "Remove Transform DSL", " This JOLT transformation will apply default values to the output JSON/data.");
          +
          + public static final PropertyDescriptor JOLT_SPEC = new PropertyDescriptor.Builder()
          + .name("Jolt Specification")
          + .description("Jolt Specification for transform of JSON data.")
          + .required(true)
          + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
          + .addValidator(new JOLTSpecValidator())
          + .required(true)
          + .build();
          +
          + public static final PropertyDescriptor JOLT_TRANSFORM = new PropertyDescriptor.Builder()
          + .name("Jolt Transformation")
          + .description("Specifies the Jolt Transformation that should be used with the provided specification.")
          + .required(true)
          + .allowableValues(SHIFTR, CHAINR, DEFAULTR, REMOVR)
          + .defaultValue(CHAINR.getValue())
          + .build();
          +
          + public static final Relationship REL_SUCCESS = new Relationship.Builder()
          + .name("success")
          + .description("The FlowFile with transformed content will be routed to this relationship")
          + .build();
          + public static final Relationship REL_FAILURE = new Relationship.Builder()
          + .name("failure")
          + .description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid JSON), it will be routed to this relationship")
          + .build();
          +
          + private final static List<PropertyDescriptor> properties;
          + private final static Set<Relationship> relationships;
          + private volatile Transform transform;
          +
          + static

          { + + final List<PropertyDescriptor> _properties = new ArrayList<>(); + _properties.add(JOLT_TRANSFORM); + _properties.add(JOLT_SPEC); + properties = Collections.unmodifiableList(_properties); + + final Set<Relationship> _relationships = new HashSet<>(); + _relationships.add(REL_SUCCESS); + _relationships.add(REL_FAILURE); + relationships = Collections.unmodifiableSet(_relationships); + + }

          +
          + @Override
          + public Set<Relationship> getRelationships()

          { + return relationships; + }

          +
          + @Override
          + protected List<PropertyDescriptor> getSupportedPropertyDescriptors()

          { + return properties; + }

          +
          + @Override
          + public void onTrigger(final ProcessContext context, ProcessSession session) throws ProcessException {
          +
          + final FlowFile original = session.get();
          + if (original == null)

          { + return; + }

          +
          + final ProcessorLog logger = getLogger();
          + final StopWatch stopWatch = new StopWatch(true);
          +
          + try {
          +
          + FlowFile transformed = session.write(original, new StreamCallback() {
          + @Override
          + public void process(final InputStream rawIn, final OutputStream out) throws IOException {
          +
          + try (final InputStream in = new BufferedInputStream(rawIn)) {
          + Object inputJson = JsonUtils.jsonToObject(in);
          + Object transformedJson = transform.transform(inputJson);
          + out.write(JsonUtils.toJsonString(transformedJson).getBytes());
          — End diff –

          Dave thanks for highlighting this issue and for your suggestions! I will definitely include sort and cardinality into the pull request. Concerning charset support my thoughts are to ensure at the least TransformJSON supports UTF8 so I will definitely incorporate that change. In terms of supporting other charsets I think using the ConvertCharacterSet processor in conjunction with TransformJSON will help to cover those concerns for now.

          The custom transformation support I think is a great idea. My thoughts are to get to a stable point first with the "stock" transforms before moving into the custom support.

          Show
          githubbot ASF GitHub Bot added a comment - Github user YolandaMDavis commented on a diff in the pull request: https://github.com/apache/nifi/pull/354#discussion_r60302366 — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformJSON.java — @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.BufferedInputStream; +import org.apache.nifi.util.StopWatch; + +import com.bazaarvoice.jolt.Shiftr; +import com.bazaarvoice.jolt.Removr; +import com.bazaarvoice.jolt.Chainr; +import com.bazaarvoice.jolt.Defaultr; +import com.bazaarvoice.jolt.Transform; +import com.bazaarvoice.jolt.JsonUtils; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags( {"json", "jolt", "transform", "shiftr", "chainr", "defaultr", "removr"} ) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Applies a list of JOLT specifications to the flowfile JSON payload. A new FlowFile is created " + + "with transformed content and is routed to the 'success' relationship. If the JSON transform " + + "fails, the original FlowFile is routed to the 'failure' relationship") +public class TransformJSON extends AbstractProcessor { + + public static final AllowableValue SHIFTR = new AllowableValue("Shift", "Shift Transform DSL", "This JOLT transformation will shift input JSON/data to create the output JSON/data."); + public static final AllowableValue CHAINR = new AllowableValue("Chain", "Chain Transform DSL", "Execute list of JOLT transformations."); + public static final AllowableValue DEFAULTR = new AllowableValue("Default", "Default Transform DSL", " This JOLT transformation will apply default values to the output JSON/data."); + public static final AllowableValue REMOVR = new AllowableValue("Remove", "Remove Transform DSL", " This JOLT transformation will apply default values to the output JSON/data."); + + public static final PropertyDescriptor JOLT_SPEC = new PropertyDescriptor.Builder() + .name("Jolt Specification") + .description("Jolt Specification for transform of JSON data.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .addValidator(new JOLTSpecValidator()) + .required(true) + .build(); + + public static final PropertyDescriptor JOLT_TRANSFORM = new PropertyDescriptor.Builder() + .name("Jolt Transformation") + .description("Specifies the Jolt Transformation that should be used with the provided specification.") + .required(true) + .allowableValues(SHIFTR, CHAINR, DEFAULTR, REMOVR) + .defaultValue(CHAINR.getValue()) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("The FlowFile with transformed content will be routed to this relationship") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid JSON), it will be routed to this relationship") + .build(); + + private final static List<PropertyDescriptor> properties; + private final static Set<Relationship> relationships; + private volatile Transform transform; + + static { + + final List<PropertyDescriptor> _properties = new ArrayList<>(); + _properties.add(JOLT_TRANSFORM); + _properties.add(JOLT_SPEC); + properties = Collections.unmodifiableList(_properties); + + final Set<Relationship> _relationships = new HashSet<>(); + _relationships.add(REL_SUCCESS); + _relationships.add(REL_FAILURE); + relationships = Collections.unmodifiableSet(_relationships); + + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public void onTrigger(final ProcessContext context, ProcessSession session) throws ProcessException { + + final FlowFile original = session.get(); + if (original == null) { + return; + } + + final ProcessorLog logger = getLogger(); + final StopWatch stopWatch = new StopWatch(true); + + try { + + FlowFile transformed = session.write(original, new StreamCallback() { + @Override + public void process(final InputStream rawIn, final OutputStream out) throws IOException { + + try (final InputStream in = new BufferedInputStream(rawIn)) { + Object inputJson = JsonUtils.jsonToObject(in); + Object transformedJson = transform.transform(inputJson); + out.write(JsonUtils.toJsonString(transformedJson).getBytes()); — End diff – Dave thanks for highlighting this issue and for your suggestions! I will definitely include sort and cardinality into the pull request. Concerning charset support my thoughts are to ensure at the least TransformJSON supports UTF8 so I will definitely incorporate that change. In terms of supporting other charsets I think using the ConvertCharacterSet processor in conjunction with TransformJSON will help to cover those concerns for now. The custom transformation support I think is a great idea. My thoughts are to get to a stable point first with the "stock" transforms before moving into the custom support.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user YolandaMDavis commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/354#discussion_r60631450

          — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformJSON.java —
          @@ -0,0 +1,211 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.processors.standard;
          +
          +
          +import java.io.IOException;
          +import java.io.InputStream;
          +import java.io.OutputStream;
          +import java.util.ArrayList;
          +import java.util.Collections;
          +import java.util.HashSet;
          +import java.util.List;
          +import java.util.Set;
          +import java.util.concurrent.TimeUnit;
          +
          +import org.apache.nifi.annotation.behavior.EventDriven;
          +import org.apache.nifi.annotation.behavior.InputRequirement;
          +import org.apache.nifi.annotation.behavior.SideEffectFree;
          +import org.apache.nifi.annotation.behavior.SupportsBatching;
          +import org.apache.nifi.annotation.documentation.CapabilityDescription;
          +import org.apache.nifi.annotation.documentation.Tags;
          +import org.apache.nifi.annotation.lifecycle.OnScheduled;
          +import org.apache.nifi.components.AllowableValue;
          +import org.apache.nifi.components.PropertyDescriptor;
          +import org.apache.nifi.components.ValidationContext;
          +import org.apache.nifi.components.ValidationResult;
          +import org.apache.nifi.components.Validator;
          +import org.apache.nifi.flowfile.FlowFile;
          +import org.apache.nifi.logging.ProcessorLog;
          +import org.apache.nifi.processor.AbstractProcessor;
          +import org.apache.nifi.processor.ProcessContext;
          +import org.apache.nifi.processor.ProcessSession;
          +import org.apache.nifi.processor.Relationship;
          +import org.apache.nifi.processor.exception.ProcessException;
          +import org.apache.nifi.processor.io.StreamCallback;
          +import org.apache.nifi.processor.util.StandardValidators;
          +import org.apache.nifi.stream.io.BufferedInputStream;
          +import org.apache.nifi.util.StopWatch;
          +
          +import com.bazaarvoice.jolt.Shiftr;
          +import com.bazaarvoice.jolt.Removr;
          +import com.bazaarvoice.jolt.Chainr;
          +import com.bazaarvoice.jolt.Defaultr;
          +import com.bazaarvoice.jolt.Transform;
          +import com.bazaarvoice.jolt.JsonUtils;
          +
          +@EventDriven
          +@SideEffectFree
          +@SupportsBatching
          +@Tags(

          {"json", "jolt", "transform", "shiftr", "chainr", "defaultr", "removr"}

          )
          +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
          +@CapabilityDescription("Applies a list of JOLT specifications to the flowfile JSON payload. A new FlowFile is created "
          + + "with transformed content and is routed to the 'success' relationship. If the JSON transform "
          + + "fails, the original FlowFile is routed to the 'failure' relationship")
          +public class TransformJSON extends AbstractProcessor {
          +
          + public static final AllowableValue SHIFTR = new AllowableValue("Shift", "Shift Transform DSL", "This JOLT transformation will shift input JSON/data to create the output JSON/data.");
          + public static final AllowableValue CHAINR = new AllowableValue("Chain", "Chain Transform DSL", "Execute list of JOLT transformations.");
          + public static final AllowableValue DEFAULTR = new AllowableValue("Default", "Default Transform DSL", " This JOLT transformation will apply default values to the output JSON/data.");
          + public static final AllowableValue REMOVR = new AllowableValue("Remove", "Remove Transform DSL", " This JOLT transformation will apply default values to the output JSON/data.");
          +
          + public static final PropertyDescriptor JOLT_SPEC = new PropertyDescriptor.Builder()
          + .name("Jolt Specification")
          + .description("Jolt Specification for transform of JSON data.")
          + .required(true)
          + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
          + .addValidator(new JOLTSpecValidator())
          + .required(true)
          + .build();
          +
          + public static final PropertyDescriptor JOLT_TRANSFORM = new PropertyDescriptor.Builder()
          + .name("Jolt Transformation")
          + .description("Specifies the Jolt Transformation that should be used with the provided specification.")
          + .required(true)
          + .allowableValues(SHIFTR, CHAINR, DEFAULTR, REMOVR)
          + .defaultValue(CHAINR.getValue())
          + .build();
          +
          + public static final Relationship REL_SUCCESS = new Relationship.Builder()
          + .name("success")
          + .description("The FlowFile with transformed content will be routed to this relationship")
          + .build();
          + public static final Relationship REL_FAILURE = new Relationship.Builder()
          + .name("failure")
          + .description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid JSON), it will be routed to this relationship")
          + .build();
          +
          + private final static List<PropertyDescriptor> properties;
          + private final static Set<Relationship> relationships;
          + private volatile Transform transform;
          +
          + static

          { + + final List<PropertyDescriptor> _properties = new ArrayList<>(); + _properties.add(JOLT_TRANSFORM); + _properties.add(JOLT_SPEC); + properties = Collections.unmodifiableList(_properties); + + final Set<Relationship> _relationships = new HashSet<>(); + _relationships.add(REL_SUCCESS); + _relationships.add(REL_FAILURE); + relationships = Collections.unmodifiableSet(_relationships); + + }

          +
          + @Override
          + public Set<Relationship> getRelationships()

          { + return relationships; + }

          +
          + @Override
          + protected List<PropertyDescriptor> getSupportedPropertyDescriptors()

          { + return properties; + }

          +
          + @Override
          + public void onTrigger(final ProcessContext context, ProcessSession session) throws ProcessException {
          +
          + final FlowFile original = session.get();
          + if (original == null)

          { + return; + }

          +
          + final ProcessorLog logger = getLogger();
          + final StopWatch stopWatch = new StopWatch(true);
          +
          + try {
          +
          + FlowFile transformed = session.write(original, new StreamCallback() {
          + @Override
          + public void process(final InputStream rawIn, final OutputStream out) throws IOException {
          +
          + try (final InputStream in = new BufferedInputStream(rawIn)) {
          + Object inputJson = JsonUtils.jsonToObject(in);
          + Object transformedJson = transform.transform(inputJson);
          + out.write(JsonUtils.toJsonString(transformedJson).getBytes());
          — End diff –

          @davetorok just submitted latest with changes cited above

          Show
          githubbot ASF GitHub Bot added a comment - Github user YolandaMDavis commented on a diff in the pull request: https://github.com/apache/nifi/pull/354#discussion_r60631450 — Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformJSON.java — @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.BufferedInputStream; +import org.apache.nifi.util.StopWatch; + +import com.bazaarvoice.jolt.Shiftr; +import com.bazaarvoice.jolt.Removr; +import com.bazaarvoice.jolt.Chainr; +import com.bazaarvoice.jolt.Defaultr; +import com.bazaarvoice.jolt.Transform; +import com.bazaarvoice.jolt.JsonUtils; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags( {"json", "jolt", "transform", "shiftr", "chainr", "defaultr", "removr"} ) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Applies a list of JOLT specifications to the flowfile JSON payload. A new FlowFile is created " + + "with transformed content and is routed to the 'success' relationship. If the JSON transform " + + "fails, the original FlowFile is routed to the 'failure' relationship") +public class TransformJSON extends AbstractProcessor { + + public static final AllowableValue SHIFTR = new AllowableValue("Shift", "Shift Transform DSL", "This JOLT transformation will shift input JSON/data to create the output JSON/data."); + public static final AllowableValue CHAINR = new AllowableValue("Chain", "Chain Transform DSL", "Execute list of JOLT transformations."); + public static final AllowableValue DEFAULTR = new AllowableValue("Default", "Default Transform DSL", " This JOLT transformation will apply default values to the output JSON/data."); + public static final AllowableValue REMOVR = new AllowableValue("Remove", "Remove Transform DSL", " This JOLT transformation will apply default values to the output JSON/data."); + + public static final PropertyDescriptor JOLT_SPEC = new PropertyDescriptor.Builder() + .name("Jolt Specification") + .description("Jolt Specification for transform of JSON data.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .addValidator(new JOLTSpecValidator()) + .required(true) + .build(); + + public static final PropertyDescriptor JOLT_TRANSFORM = new PropertyDescriptor.Builder() + .name("Jolt Transformation") + .description("Specifies the Jolt Transformation that should be used with the provided specification.") + .required(true) + .allowableValues(SHIFTR, CHAINR, DEFAULTR, REMOVR) + .defaultValue(CHAINR.getValue()) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("The FlowFile with transformed content will be routed to this relationship") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid JSON), it will be routed to this relationship") + .build(); + + private final static List<PropertyDescriptor> properties; + private final static Set<Relationship> relationships; + private volatile Transform transform; + + static { + + final List<PropertyDescriptor> _properties = new ArrayList<>(); + _properties.add(JOLT_TRANSFORM); + _properties.add(JOLT_SPEC); + properties = Collections.unmodifiableList(_properties); + + final Set<Relationship> _relationships = new HashSet<>(); + _relationships.add(REL_SUCCESS); + _relationships.add(REL_FAILURE); + relationships = Collections.unmodifiableSet(_relationships); + + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public void onTrigger(final ProcessContext context, ProcessSession session) throws ProcessException { + + final FlowFile original = session.get(); + if (original == null) { + return; + } + + final ProcessorLog logger = getLogger(); + final StopWatch stopWatch = new StopWatch(true); + + try { + + FlowFile transformed = session.write(original, new StreamCallback() { + @Override + public void process(final InputStream rawIn, final OutputStream out) throws IOException { + + try (final InputStream in = new BufferedInputStream(rawIn)) { + Object inputJson = JsonUtils.jsonToObject(in); + Object transformedJson = transform.transform(inputJson); + out.write(JsonUtils.toJsonString(transformedJson).getBytes()); — End diff – @davetorok just submitted latest with changes cited above
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user YolandaMDavis opened a pull request:

          https://github.com/apache/nifi/pull/405

          NIFI-361- Create Processors to mutate JSON data

          Implementation of the TransformJSON processor using the Jolt library. TransformJSON supports Jolt specifications for the following transformations: Chain, Shift, Remove, Sort, Cardinality and Default. Users will be able to add the TransformJSON processor, select the transformation they wish to apply and enter the specification for the given transformation.

          Details for creating Jolt specifications can be found [here](https://github.com/bazaarvoice/jolt)

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/YolandaMDavis/nifi NIFI-361-0.x

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/nifi/pull/405.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #405


          commit 873c534a057b051f3b28cac3cc496d59f0c352ef
          Author: Yolanda M. Davis <yolanda.m.davis@gmail.com>
          Date: 2016-04-14T12:19:41Z

          NIFI-361 Adding TransformJSON from master. Also changed streams in test since 1.7 only supported.
          (cherry picked from commit ffc9d19)


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user YolandaMDavis opened a pull request: https://github.com/apache/nifi/pull/405 NIFI-361 - Create Processors to mutate JSON data Implementation of the TransformJSON processor using the Jolt library. TransformJSON supports Jolt specifications for the following transformations: Chain, Shift, Remove, Sort, Cardinality and Default. Users will be able to add the TransformJSON processor, select the transformation they wish to apply and enter the specification for the given transformation. Details for creating Jolt specifications can be found [here] ( https://github.com/bazaarvoice/jolt ) You can merge this pull request into a Git repository by running: $ git pull https://github.com/YolandaMDavis/nifi NIFI-361 -0.x Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/405.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #405 commit 873c534a057b051f3b28cac3cc496d59f0c352ef Author: Yolanda M. Davis <yolanda.m.davis@gmail.com> Date: 2016-04-14T12:19:41Z NIFI-361 Adding TransformJSON from master. Also changed streams in test since 1.7 only supported. (cherry picked from commit ffc9d19)
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mattyb149 commented on the pull request:

          https://github.com/apache/nifi/pull/354#issuecomment-216614373

          Reviewed, LGTM and works well, thanks very much! +1

          Show
          githubbot ASF GitHub Bot added a comment - Github user mattyb149 commented on the pull request: https://github.com/apache/nifi/pull/354#issuecomment-216614373 Reviewed, LGTM and works well, thanks very much! +1
          Hide
          jira-bot ASF subversion and git services added a comment -

          Commit 36f6514e803e34f2e823608988327bce5e51ec01 in nifi's branch refs/heads/0.x from Yolanda M. Davis
          [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=36f6514 ]

          NIFI-361 Adding TransformJSON from master. Also changed streams in test since 1.7 only supported. (cherry picked from commit ffc9d19)

          Signed-off-by: Matt Burgess <mattyb149@apache.org>

          This closes #405

          Show
          jira-bot ASF subversion and git services added a comment - Commit 36f6514e803e34f2e823608988327bce5e51ec01 in nifi's branch refs/heads/0.x from Yolanda M. Davis [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=36f6514 ] NIFI-361 Adding TransformJSON from master. Also changed streams in test since 1.7 only supported. (cherry picked from commit ffc9d19) Signed-off-by: Matt Burgess <mattyb149@apache.org> This closes #405
          Hide
          jira-bot ASF subversion and git services added a comment -

          Commit defb6f5b6103ba7ee6be1644f7649402cdf97953 in nifi's branch refs/heads/master from Yolanda M. Davis
          [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=defb6f5 ]

          NIFI-361 - Create Processors to mutate JSON data

          Signed-off-by: Matt Burgess <mattyb149@apache.org>

          This closes #354

          Show
          jira-bot ASF subversion and git services added a comment - Commit defb6f5b6103ba7ee6be1644f7649402cdf97953 in nifi's branch refs/heads/master from Yolanda M. Davis [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=defb6f5 ] NIFI-361 - Create Processors to mutate JSON data Signed-off-by: Matt Burgess <mattyb149@apache.org> This closes #354
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/nifi/pull/354

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/nifi/pull/354
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user YolandaMDavis closed the pull request at:

          https://github.com/apache/nifi/pull/405

          Show
          githubbot ASF GitHub Bot added a comment - Github user YolandaMDavis closed the pull request at: https://github.com/apache/nifi/pull/405
          Hide
          YolandaMDavis Yolanda M. Davis added a comment -

          Was merged into 0.7.0 and 1.0.0

          Show
          YolandaMDavis Yolanda M. Davis added a comment - Was merged into 0.7.0 and 1.0.0

            People

            • Assignee:
              YolandaMDavis Yolanda M. Davis
              Reporter:
              aldrin Aldrin Piri
            • Votes:
              0 Vote for this issue
              Watchers:
              9 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development