Details

    • Type: New Feature
    • Status: Resolved
    • Priority: Minor
    • Resolution: Works for Me
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None

      Description

      With the new RecordPath DSL, it would be nice if there was a processor that could pull fields into attributes of the flowfile based on a RecordPath. This would be similar to the EvaluateJsonPath processor that currently exists, except it could be used to pull fields from arbitrary record formats. My current use case for it would be pulling fields out of Avro records while skipping the steps of having to convert Avro to JSON, evaluate JsonPath, and then converting back to Avro.

        Issue Links

          Activity

          Hide
          joewitt Joseph Witt added a comment -

          Steve Champagne A key consideration here is that a given flowfile can contain one or many records. Extracting a given record field to the flowfile attribute then has a potential complexity of being a many to one challenge. Alternatively, and by intent, if you need to ensure that all records of a given flowfile have some shared characteristic or perhaps field value you can use the upcoming nifi 1.3.0 PartitionRecord processor to ensure that for a given flowfile all records do indeed have some matching value. It is powered by RecordPath and it extracts that matched value to an attribute.

          Show
          joewitt Joseph Witt added a comment - Steve Champagne A key consideration here is that a given flowfile can contain one or many records. Extracting a given record field to the flowfile attribute then has a potential complexity of being a many to one challenge. Alternatively, and by intent, if you need to ensure that all records of a given flowfile have some shared characteristic or perhaps field value you can use the upcoming nifi 1.3.0 PartitionRecord processor to ensure that for a given flowfile all records do indeed have some matching value. It is powered by RecordPath and it extracts that matched value to an attribute.
          Hide
          champagst Steve Champagne added a comment -

          Ah, this is exactly what I'm looking for. Thanks for the heads up!

          Show
          champagst Steve Champagne added a comment - Ah, this is exactly what I'm looking for. Thanks for the heads up!
          Hide
          joewitt Joseph Witt added a comment -

          cool. Please go ahead and close this out as OBE if you are able to give that a go and agree it serves your needs.

          Show
          joewitt Joseph Witt added a comment - cool. Please go ahead and close this out as OBE if you are able to give that a go and agree it serves your needs.
          Hide
          champagst Steve Champagne added a comment -

          My only gripe is that it doesn't include fragment information when it splits the flowfile so I can merge them back together. I think I can work around that though. Other than that it's exactly what I'm looking for. I marked it Works for me since I couldn't find an OBE. Thanks again!

          Show
          champagst Steve Champagne added a comment - My only gripe is that it doesn't include fragment information when it splits the flowfile so I can merge them back together. I think I can work around that though. Other than that it's exactly what I'm looking for. I marked it Works for me since I couldn't find an OBE. Thanks again!
          Hide
          joewitt Joseph Witt added a comment -

          Yeah that is true. I'm not sure how that would work though in the case of partitioning. With split it makes sense since we're just taking sequential chunks. With partition we're not doing that so i'm not sure fragment tracking would really work. We do need to provide a MergeRecord processor though!

          Show
          joewitt Joseph Witt added a comment - Yeah that is true. I'm not sure how that would work though in the case of partitioning. With split it makes sense since we're just taking sequential chunks. With partition we're not doing that so i'm not sure fragment tracking would really work. We do need to provide a MergeRecord processor though!
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user MikeThomsen opened a pull request:

          https://github.com/apache/nifi/pull/1961

          NIFI-4024 Added org.apache.nifi.hbase.PutHBaseRecord

          Thank you for submitting a contribution to Apache NiFi.

          In order to streamline the review of the contribution we ask you
          to ensure the following steps have been taken:

              1. For all changes:
          • [ ] Is there a JIRA ticket associated with this PR? Is it referenced
            in the commit message?
          • [ ] Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
          • [ ] Has your PR been rebased against the latest commit within the target branch (typically master)?
          • [ ] Is your initial contribution a single, squashed commit?
              1. For code changes:
          • [ ] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder?
          • [ ] Have you written or updated unit tests to verify your changes?
          • [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
          • [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly?
          • [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly?
          • [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties?
              1. For documentation related changes:
          • [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
              1. Note:
                Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/MikeThomsen/nifi NIFI-4024

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/nifi/pull/1961.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #1961


          commit f624e69f88fd84e619a5044abfda250a755d96d6
          Author: Mike Thomsen <mikerthomsen@gmail.com>
          Date: 2017-06-23T11:50:26Z

          NIFI-4024 Added org.apache.nifi.hbase.PutHBaseRecord


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user MikeThomsen opened a pull request: https://github.com/apache/nifi/pull/1961 NIFI-4024 Added org.apache.nifi.hbase.PutHBaseRecord Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: For all changes: [ ] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? [ ] Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. [ ] Has your PR been rebased against the latest commit within the target branch (typically master)? [ ] Is your initial contribution a single, squashed commit? For code changes: [ ] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? [ ] Have you written or updated unit tests to verify your changes? [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0] ( http://www.apache.org/legal/resolved.html#category-a)? [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? For documentation related changes: [ ] Have you ensured that format looks appropriate for the output in which it is rendered? Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/MikeThomsen/nifi NIFI-4024 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/1961.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1961 commit f624e69f88fd84e619a5044abfda250a755d96d6 Author: Mike Thomsen <mikerthomsen@gmail.com> Date: 2017-06-23T11:50:26Z NIFI-4024 Added org.apache.nifi.hbase.PutHBaseRecord
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user bbende commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1961#discussion_r125640198

          — Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java —
          @@ -0,0 +1,316 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.hbase;
          +
          +import org.apache.hadoop.hbase.util.Bytes;
          +import org.apache.nifi.annotation.behavior.EventDriven;
          +import org.apache.nifi.annotation.behavior.InputRequirement;
          +import org.apache.nifi.annotation.behavior.SupportsBatching;
          +import org.apache.nifi.annotation.documentation.CapabilityDescription;
          +import org.apache.nifi.annotation.documentation.Tags;
          +import org.apache.nifi.components.AllowableValue;
          +import org.apache.nifi.components.PropertyDescriptor;
          +import org.apache.nifi.flowfile.FlowFile;
          +import org.apache.nifi.hbase.put.PutColumn;
          +import org.apache.nifi.hbase.put.PutFlowFile;
          +import org.apache.nifi.processor.ProcessContext;
          +import org.apache.nifi.processor.ProcessSession;
          +import org.apache.nifi.processor.Relationship;
          +import org.apache.nifi.processor.exception.ProcessException;
          +import org.apache.nifi.processor.util.StandardValidators;
          +import org.apache.nifi.serialization.RecordReader;
          +import org.apache.nifi.serialization.RecordReaderFactory;
          +import org.apache.nifi.serialization.record.Record;
          +import org.apache.nifi.serialization.record.RecordFieldType;
          +import org.apache.nifi.serialization.record.RecordSchema;
          +
          +import java.util.ArrayList;
          +import java.util.Arrays;
          +import java.util.HashMap;
          +import java.util.HashSet;
          +import java.util.List;
          +import java.util.Map;
          +import java.util.Set;
          +import java.util.concurrent.TimeUnit;
          +
          +@EventDriven
          +@SupportsBatching
          +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
          +@Tags(

          {"hadoop", "hbase", "put", "record"}

          )
          +@CapabilityDescription("Adds rows to HBase based on the contents of a flowfile using a configured record reader.")
          +public class PutHBaseRecord extends AbstractPutHBase {
          +
          + protected static final PropertyDescriptor ROW_FIELD_NAME = new PropertyDescriptor.Builder()
          + .name("Row Identifier Field Name")
          + .description("Specifies the name of a JSON element whose value should be used as the row id for the given JSON document.")
          — End diff –

          We should go through all the property descriptors, allowable values, etc. and make sure references to "JSON" are appropriately replaced with "Record".

          Show
          githubbot ASF GitHub Bot added a comment - Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/1961#discussion_r125640198 — Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java — @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.hbase.put.PutColumn; +import org.apache.nifi.hbase.put.PutFlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +@EventDriven +@SupportsBatching +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags( {"hadoop", "hbase", "put", "record"} ) +@CapabilityDescription("Adds rows to HBase based on the contents of a flowfile using a configured record reader.") +public class PutHBaseRecord extends AbstractPutHBase { + + protected static final PropertyDescriptor ROW_FIELD_NAME = new PropertyDescriptor.Builder() + .name("Row Identifier Field Name") + .description("Specifies the name of a JSON element whose value should be used as the row id for the given JSON document.") — End diff – We should go through all the property descriptors, allowable values, etc. and make sure references to "JSON" are appropriately replaced with "Record".
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user bbende commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1961#discussion_r125639967

          — Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/pom.xml —
          @@ -82,5 +90,15 @@
          </exclusions>
          <scope>test</scope>
          </dependency>
          + <dependency>
          + <groupId>org.apache.nifi</groupId>
          + <artifactId>nifi-mock-record-utils</artifactId>
          + <scope>test</scope>
          + </dependency>
          + <dependency>
          + <groupId>org.apache.hbase</groupId>
          + <artifactId>hbase-common</artifactId>
          — End diff –

          We should keep all the HBase client dependencies behind HBaseClientService so that the version of the client doesn't leak into the processors NAR.

          It looks like the main reason for adding this was to use the `Bytes` class from HBase common, which we ran into once before and we ended exposing some `toBytes` methods on `HBaseClientService`. We should add the additional toBytes methods that we need, or we could possibly add a method like:

          ` byte[] asBytes(String field, RecordFieldType fieldType, Record record)`

          Show
          githubbot ASF GitHub Bot added a comment - Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/1961#discussion_r125639967 — Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/pom.xml — @@ -82,5 +90,15 @@ </exclusions> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock-record-utils</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-common</artifactId> — End diff – We should keep all the HBase client dependencies behind HBaseClientService so that the version of the client doesn't leak into the processors NAR. It looks like the main reason for adding this was to use the `Bytes` class from HBase common, which we ran into once before and we ended exposing some `toBytes` methods on `HBaseClientService`. We should add the additional toBytes methods that we need, or we could possibly add a method like: ` byte[] asBytes(String field, RecordFieldType fieldType, Record record)`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user bbende commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1961#discussion_r125645738

          — Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java —
          @@ -0,0 +1,316 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.hbase;
          +
          +import org.apache.hadoop.hbase.util.Bytes;
          +import org.apache.nifi.annotation.behavior.EventDriven;
          +import org.apache.nifi.annotation.behavior.InputRequirement;
          +import org.apache.nifi.annotation.behavior.SupportsBatching;
          +import org.apache.nifi.annotation.documentation.CapabilityDescription;
          +import org.apache.nifi.annotation.documentation.Tags;
          +import org.apache.nifi.components.AllowableValue;
          +import org.apache.nifi.components.PropertyDescriptor;
          +import org.apache.nifi.flowfile.FlowFile;
          +import org.apache.nifi.hbase.put.PutColumn;
          +import org.apache.nifi.hbase.put.PutFlowFile;
          +import org.apache.nifi.processor.ProcessContext;
          +import org.apache.nifi.processor.ProcessSession;
          +import org.apache.nifi.processor.Relationship;
          +import org.apache.nifi.processor.exception.ProcessException;
          +import org.apache.nifi.processor.util.StandardValidators;
          +import org.apache.nifi.serialization.RecordReader;
          +import org.apache.nifi.serialization.RecordReaderFactory;
          +import org.apache.nifi.serialization.record.Record;
          +import org.apache.nifi.serialization.record.RecordFieldType;
          +import org.apache.nifi.serialization.record.RecordSchema;
          +
          +import java.util.ArrayList;
          +import java.util.Arrays;
          +import java.util.HashMap;
          +import java.util.HashSet;
          +import java.util.List;
          +import java.util.Map;
          +import java.util.Set;
          +import java.util.concurrent.TimeUnit;
          +
          +@EventDriven
          +@SupportsBatching
          +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
          +@Tags(

          {"hadoop", "hbase", "put", "record"}

          )
          +@CapabilityDescription("Adds rows to HBase based on the contents of a flowfile using a configured record reader.")
          +public class PutHBaseRecord extends AbstractPutHBase {
          +
          + protected static final PropertyDescriptor ROW_FIELD_NAME = new PropertyDescriptor.Builder()
          + .name("Row Identifier Field Name")
          + .description("Specifies the name of a JSON element whose value should be used as the row id for the given JSON document.")
          + .expressionLanguageSupported(true)
          + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
          + .build();
          +
          + protected static final String FAIL_VALUE = "Fail";
          + protected static final String WARN_VALUE = "Warn";
          + protected static final String IGNORE_VALUE = "Ignore";
          + protected static final String TEXT_VALUE = "Text";
          +
          + protected static final AllowableValue COMPLEX_FIELD_FAIL = new AllowableValue(FAIL_VALUE, FAIL_VALUE, "Route entire FlowFile to failure if any elements contain complex values.");
          + protected static final AllowableValue COMPLEX_FIELD_WARN = new AllowableValue(WARN_VALUE, WARN_VALUE, "Provide a warning and do not include field in row sent to HBase.");
          + protected static final AllowableValue COMPLEX_FIELD_IGNORE = new AllowableValue(IGNORE_VALUE, IGNORE_VALUE, "Silently ignore and do not include in row sent to HBase.");
          + protected static final AllowableValue COMPLEX_FIELD_TEXT = new AllowableValue(TEXT_VALUE, TEXT_VALUE, "Use the string representation of the complex field as the value of the given column.");
          +
          + static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder()
          + .name("record-reader")
          + .displayName("Record Reader")
          + .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema")
          + .identifiesControllerService(RecordReaderFactory.class)
          + .required(true)
          + .build();
          +
          + protected static final PropertyDescriptor COMPLEX_FIELD_STRATEGY = new PropertyDescriptor.Builder()
          + .name("Complex Field Strategy")
          + .description("Indicates how to handle complex fields, i.e. fields that do not have a single text value.")
          + .expressionLanguageSupported(false)
          + .required(true)
          + .allowableValues(COMPLEX_FIELD_FAIL, COMPLEX_FIELD_WARN, COMPLEX_FIELD_IGNORE, COMPLEX_FIELD_TEXT)
          + .defaultValue(COMPLEX_FIELD_TEXT.getValue())
          + .build();
          +
          +
          + protected static final AllowableValue FIELD_ENCODING_STRING = new AllowableValue(STRING_ENCODING_VALUE, STRING_ENCODING_VALUE,
          + "Stores the value of each field as a UTF-8 String.");
          + protected static final AllowableValue FIELD_ENCODING_BYTES = new AllowableValue(BYTES_ENCODING_VALUE, BYTES_ENCODING_VALUE,
          + "Stores the value of each field as the byte representation of the type derived from the JSON.");
          +
          + protected static final PropertyDescriptor FIELD_ENCODING_STRATEGY = new PropertyDescriptor.Builder()
          + .name("Field Encoding Strategy")
          + .description(("Indicates how to store the value of each field in HBase. The default behavior is to convert each value from the " +
          + "JSON to a String, and store the UTF-8 bytes. Choosing Bytes will interpret the type of each field from " +
          + "the JSON, and convert the value to the byte representation of that type, meaning an integer will be stored as the " +
          + "byte representation of that integer."))
          + .required(true)
          + .allowableValues(FIELD_ENCODING_STRING, FIELD_ENCODING_BYTES)
          + .defaultValue(FIELD_ENCODING_STRING.getValue())
          + .build();
          +
          + @Override
          + public final List<PropertyDescriptor> getSupportedPropertyDescriptors()

          { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(RECORD_READER_FACTORY); + properties.add(HBASE_CLIENT_SERVICE); + properties.add(TABLE_NAME); + properties.add(ROW_ID); + properties.add(ROW_FIELD_NAME); + properties.add(ROW_ID_ENCODING_STRATEGY); + properties.add(COLUMN_FAMILY); + properties.add(BATCH_SIZE); + properties.add(COMPLEX_FIELD_STRATEGY); + properties.add(FIELD_ENCODING_STRATEGY); + return properties; + }

          +
          + @Override
          + public Set<Relationship> getRelationships()

          { + final Set<Relationship> rels = new HashSet<>(); + rels.add(REL_SUCCESS); + rels.add(REL_FAILURE); + return rels; + }

          +
          +
          + private RecordReaderFactory recordParserFactory;
          + @Override
          + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
          + final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
          — End diff –

          See later comment about possibly changing the meaning of batch size, if we did that we would only grab one flow file here.

          Show
          githubbot ASF GitHub Bot added a comment - Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/1961#discussion_r125645738 — Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java — @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.hbase.put.PutColumn; +import org.apache.nifi.hbase.put.PutFlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +@EventDriven +@SupportsBatching +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags( {"hadoop", "hbase", "put", "record"} ) +@CapabilityDescription("Adds rows to HBase based on the contents of a flowfile using a configured record reader.") +public class PutHBaseRecord extends AbstractPutHBase { + + protected static final PropertyDescriptor ROW_FIELD_NAME = new PropertyDescriptor.Builder() + .name("Row Identifier Field Name") + .description("Specifies the name of a JSON element whose value should be used as the row id for the given JSON document.") + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + protected static final String FAIL_VALUE = "Fail"; + protected static final String WARN_VALUE = "Warn"; + protected static final String IGNORE_VALUE = "Ignore"; + protected static final String TEXT_VALUE = "Text"; + + protected static final AllowableValue COMPLEX_FIELD_FAIL = new AllowableValue(FAIL_VALUE, FAIL_VALUE, "Route entire FlowFile to failure if any elements contain complex values."); + protected static final AllowableValue COMPLEX_FIELD_WARN = new AllowableValue(WARN_VALUE, WARN_VALUE, "Provide a warning and do not include field in row sent to HBase."); + protected static final AllowableValue COMPLEX_FIELD_IGNORE = new AllowableValue(IGNORE_VALUE, IGNORE_VALUE, "Silently ignore and do not include in row sent to HBase."); + protected static final AllowableValue COMPLEX_FIELD_TEXT = new AllowableValue(TEXT_VALUE, TEXT_VALUE, "Use the string representation of the complex field as the value of the given column."); + + static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder() + .name("record-reader") + .displayName("Record Reader") + .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema") + .identifiesControllerService(RecordReaderFactory.class) + .required(true) + .build(); + + protected static final PropertyDescriptor COMPLEX_FIELD_STRATEGY = new PropertyDescriptor.Builder() + .name("Complex Field Strategy") + .description("Indicates how to handle complex fields, i.e. fields that do not have a single text value.") + .expressionLanguageSupported(false) + .required(true) + .allowableValues(COMPLEX_FIELD_FAIL, COMPLEX_FIELD_WARN, COMPLEX_FIELD_IGNORE, COMPLEX_FIELD_TEXT) + .defaultValue(COMPLEX_FIELD_TEXT.getValue()) + .build(); + + + protected static final AllowableValue FIELD_ENCODING_STRING = new AllowableValue(STRING_ENCODING_VALUE, STRING_ENCODING_VALUE, + "Stores the value of each field as a UTF-8 String."); + protected static final AllowableValue FIELD_ENCODING_BYTES = new AllowableValue(BYTES_ENCODING_VALUE, BYTES_ENCODING_VALUE, + "Stores the value of each field as the byte representation of the type derived from the JSON."); + + protected static final PropertyDescriptor FIELD_ENCODING_STRATEGY = new PropertyDescriptor.Builder() + .name("Field Encoding Strategy") + .description(("Indicates how to store the value of each field in HBase. The default behavior is to convert each value from the " + + "JSON to a String, and store the UTF-8 bytes. Choosing Bytes will interpret the type of each field from " + + "the JSON, and convert the value to the byte representation of that type, meaning an integer will be stored as the " + + "byte representation of that integer.")) + .required(true) + .allowableValues(FIELD_ENCODING_STRING, FIELD_ENCODING_BYTES) + .defaultValue(FIELD_ENCODING_STRING.getValue()) + .build(); + + @Override + public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(RECORD_READER_FACTORY); + properties.add(HBASE_CLIENT_SERVICE); + properties.add(TABLE_NAME); + properties.add(ROW_ID); + properties.add(ROW_FIELD_NAME); + properties.add(ROW_ID_ENCODING_STRATEGY); + properties.add(COLUMN_FAMILY); + properties.add(BATCH_SIZE); + properties.add(COMPLEX_FIELD_STRATEGY); + properties.add(FIELD_ENCODING_STRATEGY); + return properties; + } + + @Override + public Set<Relationship> getRelationships() { + final Set<Relationship> rels = new HashSet<>(); + rels.add(REL_SUCCESS); + rels.add(REL_FAILURE); + return rels; + } + + + private RecordReaderFactory recordParserFactory; + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); — End diff – See later comment about possibly changing the meaning of batch size, if we did that we would only grab one flow file here.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user bbende commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1961#discussion_r125645539

          — Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java —
          @@ -0,0 +1,316 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.hbase;
          +
          +import org.apache.hadoop.hbase.util.Bytes;
          +import org.apache.nifi.annotation.behavior.EventDriven;
          +import org.apache.nifi.annotation.behavior.InputRequirement;
          +import org.apache.nifi.annotation.behavior.SupportsBatching;
          +import org.apache.nifi.annotation.documentation.CapabilityDescription;
          +import org.apache.nifi.annotation.documentation.Tags;
          +import org.apache.nifi.components.AllowableValue;
          +import org.apache.nifi.components.PropertyDescriptor;
          +import org.apache.nifi.flowfile.FlowFile;
          +import org.apache.nifi.hbase.put.PutColumn;
          +import org.apache.nifi.hbase.put.PutFlowFile;
          +import org.apache.nifi.processor.ProcessContext;
          +import org.apache.nifi.processor.ProcessSession;
          +import org.apache.nifi.processor.Relationship;
          +import org.apache.nifi.processor.exception.ProcessException;
          +import org.apache.nifi.processor.util.StandardValidators;
          +import org.apache.nifi.serialization.RecordReader;
          +import org.apache.nifi.serialization.RecordReaderFactory;
          +import org.apache.nifi.serialization.record.Record;
          +import org.apache.nifi.serialization.record.RecordFieldType;
          +import org.apache.nifi.serialization.record.RecordSchema;
          +
          +import java.util.ArrayList;
          +import java.util.Arrays;
          +import java.util.HashMap;
          +import java.util.HashSet;
          +import java.util.List;
          +import java.util.Map;
          +import java.util.Set;
          +import java.util.concurrent.TimeUnit;
          +
          +@EventDriven
          +@SupportsBatching
          +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
          +@Tags(

          {"hadoop", "hbase", "put", "record"}

          )
          +@CapabilityDescription("Adds rows to HBase based on the contents of a flowfile using a configured record reader.")
          +public class PutHBaseRecord extends AbstractPutHBase {
          +
          + protected static final PropertyDescriptor ROW_FIELD_NAME = new PropertyDescriptor.Builder()
          + .name("Row Identifier Field Name")
          + .description("Specifies the name of a JSON element whose value should be used as the row id for the given JSON document.")
          + .expressionLanguageSupported(true)
          + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
          + .build();
          +
          + protected static final String FAIL_VALUE = "Fail";
          + protected static final String WARN_VALUE = "Warn";
          + protected static final String IGNORE_VALUE = "Ignore";
          + protected static final String TEXT_VALUE = "Text";
          +
          + protected static final AllowableValue COMPLEX_FIELD_FAIL = new AllowableValue(FAIL_VALUE, FAIL_VALUE, "Route entire FlowFile to failure if any elements contain complex values.");
          + protected static final AllowableValue COMPLEX_FIELD_WARN = new AllowableValue(WARN_VALUE, WARN_VALUE, "Provide a warning and do not include field in row sent to HBase.");
          + protected static final AllowableValue COMPLEX_FIELD_IGNORE = new AllowableValue(IGNORE_VALUE, IGNORE_VALUE, "Silently ignore and do not include in row sent to HBase.");
          + protected static final AllowableValue COMPLEX_FIELD_TEXT = new AllowableValue(TEXT_VALUE, TEXT_VALUE, "Use the string representation of the complex field as the value of the given column.");
          +
          + static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder()
          + .name("record-reader")
          + .displayName("Record Reader")
          + .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema")
          + .identifiesControllerService(RecordReaderFactory.class)
          + .required(true)
          + .build();
          +
          + protected static final PropertyDescriptor COMPLEX_FIELD_STRATEGY = new PropertyDescriptor.Builder()
          + .name("Complex Field Strategy")
          + .description("Indicates how to handle complex fields, i.e. fields that do not have a single text value.")
          + .expressionLanguageSupported(false)
          + .required(true)
          + .allowableValues(COMPLEX_FIELD_FAIL, COMPLEX_FIELD_WARN, COMPLEX_FIELD_IGNORE, COMPLEX_FIELD_TEXT)
          + .defaultValue(COMPLEX_FIELD_TEXT.getValue())
          + .build();
          +
          +
          + protected static final AllowableValue FIELD_ENCODING_STRING = new AllowableValue(STRING_ENCODING_VALUE, STRING_ENCODING_VALUE,
          + "Stores the value of each field as a UTF-8 String.");
          + protected static final AllowableValue FIELD_ENCODING_BYTES = new AllowableValue(BYTES_ENCODING_VALUE, BYTES_ENCODING_VALUE,
          + "Stores the value of each field as the byte representation of the type derived from the JSON.");
          +
          + protected static final PropertyDescriptor FIELD_ENCODING_STRATEGY = new PropertyDescriptor.Builder()
          + .name("Field Encoding Strategy")
          + .description(("Indicates how to store the value of each field in HBase. The default behavior is to convert each value from the " +
          + "JSON to a String, and store the UTF-8 bytes. Choosing Bytes will interpret the type of each field from " +
          + "the JSON, and convert the value to the byte representation of that type, meaning an integer will be stored as the " +
          + "byte representation of that integer."))
          + .required(true)
          + .allowableValues(FIELD_ENCODING_STRING, FIELD_ENCODING_BYTES)
          + .defaultValue(FIELD_ENCODING_STRING.getValue())
          + .build();
          +
          + @Override
          + public final List<PropertyDescriptor> getSupportedPropertyDescriptors()

          { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(RECORD_READER_FACTORY); + properties.add(HBASE_CLIENT_SERVICE); + properties.add(TABLE_NAME); + properties.add(ROW_ID); + properties.add(ROW_FIELD_NAME); + properties.add(ROW_ID_ENCODING_STRATEGY); + properties.add(COLUMN_FAMILY); + properties.add(BATCH_SIZE); + properties.add(COMPLEX_FIELD_STRATEGY); + properties.add(FIELD_ENCODING_STRATEGY); + return properties; + }

          +
          + @Override
          + public Set<Relationship> getRelationships()

          { + final Set<Relationship> rels = new HashSet<>(); + rels.add(REL_SUCCESS); + rels.add(REL_FAILURE); + return rels; + }

          +
          +
          + private RecordReaderFactory recordParserFactory;
          + @Override
          + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
          + final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
          + List<FlowFile> flowFiles = session.get(batchSize);
          + if (flowFiles == null || flowFiles.size() == 0)

          { + return; + }

          +
          + final Map<String,List<PutFlowFile>> tablePuts = new HashMap<>();
          + final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER_FACTORY)
          + .asControllerService(RecordReaderFactory.class);
          +
          + // Group FlowFiles by HBase Table
          +
          + final Map<FlowFile, List<PutFlowFile>> results = new HashMap<>();
          +
          + for (final FlowFile flowFile : flowFiles) {
          + getLogger().info("Working on flowfile with size {}", new Object[]

          {flowFile.getSize()}

          );
          + final List<PutFlowFile> puts = createPuts(session, context, flowFile, recordParserFactory);
          +
          + if (puts.size() == 0)

          { + getLogger().info("Put list was empty, skipping..."); + session.transfer(flowFile, REL_FAILURE); + continue; + }

          +
          + boolean foundInvalid = false;
          + for (PutFlowFile putFlowFile : puts) {
          + if (!putFlowFile.isValid() && !foundInvalid)

          { + session.transfer(flowFile, REL_FAILURE); + foundInvalid = true; + }

          + }
          +
          + if (!foundInvalid) {
          + String tableName = puts.get(0).getTableName();
          + List<PutFlowFile> putFlowFiles = tablePuts.get(tableName);
          + if (putFlowFiles == null)

          { + putFlowFiles = new ArrayList<>(); + tablePuts.put(tableName, putFlowFiles); + }

          + putFlowFiles.addAll(puts);
          +
          + results.put(flowFile, puts);
          + }
          + }
          +
          + getLogger().debug("Sending {} FlowFiles to HBase in {} put operations", new Object[]

          {flowFiles.size(), tablePuts.size()}

          );
          +
          + final long start = System.nanoTime();
          + final List<FlowFile> successes = new ArrayList<>();
          + final Map<FlowFile, Integer> columnsAdded = new HashMap<>();
          + final Map<FlowFile, List<String>> provenanceEvents = new HashMap<>();
          +
          + for (Map.Entry<FlowFile, List<PutFlowFile>> entry : results.entrySet()) {
          + try {
          + clientService.put(entry.getValue().get(0).getTableName(), entry.getValue());
          + successes.add(entry.getKey());
          +
          + int columns = 0;
          + for (PutFlowFile putFlowFile : entry.getValue())

          { + columns += putFlowFile.getColumns().size(); + }

          + PutFlowFile first = entry.getValue().get(0);
          + PutFlowFile last = entry.getValue().get(entry.getValue().size() - 1);
          + List<String> events = Arrays.asList(getTransitUri(first), getTransitUri(last));
          +
          + columnsAdded.put( entry.getKey(), new Integer(columns));
          + provenanceEvents.put(entry.getKey(), events);
          + } catch (Exception e)

          { + getLogger().error(e.getMessage(), e); + + final FlowFile failure = session.penalize(entry.getKey()); + session.transfer(failure, REL_FAILURE); + }

          + }
          +
          + final long sendMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
          + getLogger().debug("Sent {} FlowFiles to HBase successfully in {} milliseconds", new Object[]

          {successes.size(), sendMillis}

          );
          +
          + for (FlowFile flowFile : successes)

          { + session.transfer(flowFile, REL_SUCCESS); + final String details = "Put " + columnsAdded.get(flowFile) + " cells to HBase"; + session.getProvenanceReporter().send(flowFile, provenanceEvents.get(flowFile).get(0), details, sendMillis); + session.getProvenanceReporter().send(flowFile, provenanceEvents.get(flowFile).get(0), details, sendMillis); + }

          +
          + session.commit();
          + }
          +
          +
          + @Override
          + protected PutFlowFile createPut(ProcessSession session, ProcessContext context, FlowFile flowFile)

          { + return null; + }

          +
          + protected byte[] asBytes(String field, Object input) {
          + byte[] retVal = null;
          +
          + if (input instanceof Number) {
          + if (input instanceof Float || input instanceof Double)

          { + retVal = clientService.toBytes(Double.parseDouble(input.toString())); + }

          else

          { + retVal = clientService.toBytes(Long.parseLong(input.toString())); + }

          + } else if (input instanceof Boolean)

          { + retVal = clientService.toBytes((Boolean)input); + }

          else if (input instanceof String)

          { + retVal = clientService.toBytes((String)input); + }

          else

          { + throw new RuntimeException(String.format("Could not identify type for field %", field)); + }

          +
          + return retVal;
          + }
          +
          + protected byte[] asBytes(String field, RecordFieldType fieldType, Record record, boolean asString) {
          +
          + byte[] retVal;
          +
          + if (asString)

          { + retVal = Bytes.toBytes(record.getAsString(field)); + }

          else {
          + switch (fieldType)

          { + case BOOLEAN: + retVal = Bytes.toBytes(record.getAsBoolean(field)); + break; + case CHAR: + retVal = Bytes.toBytes(record.getAsString(field)); + break; + case DOUBLE: + retVal = Bytes.toBytes(record.getAsDouble(field)); + break; + case FLOAT: + retVal = Bytes.toBytes(record.getAsFloat(field)); + break; + case INT: + retVal = Bytes.toBytes(record.getAsInt(field)); + break; + case LONG: + retVal = Bytes.toBytes(record.getAsLong(field)); + break; + default: + retVal = Bytes.toBytes(record.getAsString(field)); + }

          + }
          +
          + return retVal;
          + }
          +
          + protected List<PutFlowFile> createPuts(ProcessSession session, ProcessContext context, FlowFile flowFile, RecordReaderFactory recordParserFactory) {
          — End diff –

          I think we should try to avoid reading the entire flow file into memory if possible...

          With PutHBaseJSON we had one JSON message per flow file, so the # of messages in memory at one time would be equal to the batch size. With the record oriented processors, the whole idea is to keep a lot of records together for efficiency, so we might have millions of records in a single flow file which could lead to memory issues if they were all read into memory.

          A possible approach to deal with this could be the following...

          We could change the meaning of Batch Size in this processor, and instead of it being the number of flow files to grab, it could be the number of records to send to HBase in one call. So the processor grabs one flow file, creates a record reader and starts reading records up to batch size, sends the batch to HBase, and then starts a new batch. We can also keep track of the index of the last successful record, and if an error occurs at any point we then route the flow file to failure and add an attribute like "record.successful.index.<processor-id>". If the processor receives a flow file with this attribute already populated, then it will pick up from this index when sending records.

          Thoughts?

          Show
          githubbot ASF GitHub Bot added a comment - Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/1961#discussion_r125645539 — Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java — @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.hbase.put.PutColumn; +import org.apache.nifi.hbase.put.PutFlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +@EventDriven +@SupportsBatching +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags( {"hadoop", "hbase", "put", "record"} ) +@CapabilityDescription("Adds rows to HBase based on the contents of a flowfile using a configured record reader.") +public class PutHBaseRecord extends AbstractPutHBase { + + protected static final PropertyDescriptor ROW_FIELD_NAME = new PropertyDescriptor.Builder() + .name("Row Identifier Field Name") + .description("Specifies the name of a JSON element whose value should be used as the row id for the given JSON document.") + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + protected static final String FAIL_VALUE = "Fail"; + protected static final String WARN_VALUE = "Warn"; + protected static final String IGNORE_VALUE = "Ignore"; + protected static final String TEXT_VALUE = "Text"; + + protected static final AllowableValue COMPLEX_FIELD_FAIL = new AllowableValue(FAIL_VALUE, FAIL_VALUE, "Route entire FlowFile to failure if any elements contain complex values."); + protected static final AllowableValue COMPLEX_FIELD_WARN = new AllowableValue(WARN_VALUE, WARN_VALUE, "Provide a warning and do not include field in row sent to HBase."); + protected static final AllowableValue COMPLEX_FIELD_IGNORE = new AllowableValue(IGNORE_VALUE, IGNORE_VALUE, "Silently ignore and do not include in row sent to HBase."); + protected static final AllowableValue COMPLEX_FIELD_TEXT = new AllowableValue(TEXT_VALUE, TEXT_VALUE, "Use the string representation of the complex field as the value of the given column."); + + static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder() + .name("record-reader") + .displayName("Record Reader") + .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema") + .identifiesControllerService(RecordReaderFactory.class) + .required(true) + .build(); + + protected static final PropertyDescriptor COMPLEX_FIELD_STRATEGY = new PropertyDescriptor.Builder() + .name("Complex Field Strategy") + .description("Indicates how to handle complex fields, i.e. fields that do not have a single text value.") + .expressionLanguageSupported(false) + .required(true) + .allowableValues(COMPLEX_FIELD_FAIL, COMPLEX_FIELD_WARN, COMPLEX_FIELD_IGNORE, COMPLEX_FIELD_TEXT) + .defaultValue(COMPLEX_FIELD_TEXT.getValue()) + .build(); + + + protected static final AllowableValue FIELD_ENCODING_STRING = new AllowableValue(STRING_ENCODING_VALUE, STRING_ENCODING_VALUE, + "Stores the value of each field as a UTF-8 String."); + protected static final AllowableValue FIELD_ENCODING_BYTES = new AllowableValue(BYTES_ENCODING_VALUE, BYTES_ENCODING_VALUE, + "Stores the value of each field as the byte representation of the type derived from the JSON."); + + protected static final PropertyDescriptor FIELD_ENCODING_STRATEGY = new PropertyDescriptor.Builder() + .name("Field Encoding Strategy") + .description(("Indicates how to store the value of each field in HBase. The default behavior is to convert each value from the " + + "JSON to a String, and store the UTF-8 bytes. Choosing Bytes will interpret the type of each field from " + + "the JSON, and convert the value to the byte representation of that type, meaning an integer will be stored as the " + + "byte representation of that integer.")) + .required(true) + .allowableValues(FIELD_ENCODING_STRING, FIELD_ENCODING_BYTES) + .defaultValue(FIELD_ENCODING_STRING.getValue()) + .build(); + + @Override + public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(RECORD_READER_FACTORY); + properties.add(HBASE_CLIENT_SERVICE); + properties.add(TABLE_NAME); + properties.add(ROW_ID); + properties.add(ROW_FIELD_NAME); + properties.add(ROW_ID_ENCODING_STRATEGY); + properties.add(COLUMN_FAMILY); + properties.add(BATCH_SIZE); + properties.add(COMPLEX_FIELD_STRATEGY); + properties.add(FIELD_ENCODING_STRATEGY); + return properties; + } + + @Override + public Set<Relationship> getRelationships() { + final Set<Relationship> rels = new HashSet<>(); + rels.add(REL_SUCCESS); + rels.add(REL_FAILURE); + return rels; + } + + + private RecordReaderFactory recordParserFactory; + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); + List<FlowFile> flowFiles = session.get(batchSize); + if (flowFiles == null || flowFiles.size() == 0) { + return; + } + + final Map<String,List<PutFlowFile>> tablePuts = new HashMap<>(); + final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER_FACTORY) + .asControllerService(RecordReaderFactory.class); + + // Group FlowFiles by HBase Table + + final Map<FlowFile, List<PutFlowFile>> results = new HashMap<>(); + + for (final FlowFile flowFile : flowFiles) { + getLogger().info("Working on flowfile with size {}", new Object[] {flowFile.getSize()} ); + final List<PutFlowFile> puts = createPuts(session, context, flowFile, recordParserFactory); + + if (puts.size() == 0) { + getLogger().info("Put list was empty, skipping..."); + session.transfer(flowFile, REL_FAILURE); + continue; + } + + boolean foundInvalid = false; + for (PutFlowFile putFlowFile : puts) { + if (!putFlowFile.isValid() && !foundInvalid) { + session.transfer(flowFile, REL_FAILURE); + foundInvalid = true; + } + } + + if (!foundInvalid) { + String tableName = puts.get(0).getTableName(); + List<PutFlowFile> putFlowFiles = tablePuts.get(tableName); + if (putFlowFiles == null) { + putFlowFiles = new ArrayList<>(); + tablePuts.put(tableName, putFlowFiles); + } + putFlowFiles.addAll(puts); + + results.put(flowFile, puts); + } + } + + getLogger().debug("Sending {} FlowFiles to HBase in {} put operations", new Object[] {flowFiles.size(), tablePuts.size()} ); + + final long start = System.nanoTime(); + final List<FlowFile> successes = new ArrayList<>(); + final Map<FlowFile, Integer> columnsAdded = new HashMap<>(); + final Map<FlowFile, List<String>> provenanceEvents = new HashMap<>(); + + for (Map.Entry<FlowFile, List<PutFlowFile>> entry : results.entrySet()) { + try { + clientService.put(entry.getValue().get(0).getTableName(), entry.getValue()); + successes.add(entry.getKey()); + + int columns = 0; + for (PutFlowFile putFlowFile : entry.getValue()) { + columns += putFlowFile.getColumns().size(); + } + PutFlowFile first = entry.getValue().get(0); + PutFlowFile last = entry.getValue().get(entry.getValue().size() - 1); + List<String> events = Arrays.asList(getTransitUri(first), getTransitUri(last)); + + columnsAdded.put( entry.getKey(), new Integer(columns)); + provenanceEvents.put(entry.getKey(), events); + } catch (Exception e) { + getLogger().error(e.getMessage(), e); + + final FlowFile failure = session.penalize(entry.getKey()); + session.transfer(failure, REL_FAILURE); + } + } + + final long sendMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + getLogger().debug("Sent {} FlowFiles to HBase successfully in {} milliseconds", new Object[] {successes.size(), sendMillis} ); + + for (FlowFile flowFile : successes) { + session.transfer(flowFile, REL_SUCCESS); + final String details = "Put " + columnsAdded.get(flowFile) + " cells to HBase"; + session.getProvenanceReporter().send(flowFile, provenanceEvents.get(flowFile).get(0), details, sendMillis); + session.getProvenanceReporter().send(flowFile, provenanceEvents.get(flowFile).get(0), details, sendMillis); + } + + session.commit(); + } + + + @Override + protected PutFlowFile createPut(ProcessSession session, ProcessContext context, FlowFile flowFile) { + return null; + } + + protected byte[] asBytes(String field, Object input) { + byte[] retVal = null; + + if (input instanceof Number) { + if (input instanceof Float || input instanceof Double) { + retVal = clientService.toBytes(Double.parseDouble(input.toString())); + } else { + retVal = clientService.toBytes(Long.parseLong(input.toString())); + } + } else if (input instanceof Boolean) { + retVal = clientService.toBytes((Boolean)input); + } else if (input instanceof String) { + retVal = clientService.toBytes((String)input); + } else { + throw new RuntimeException(String.format("Could not identify type for field %", field)); + } + + return retVal; + } + + protected byte[] asBytes(String field, RecordFieldType fieldType, Record record, boolean asString) { + + byte[] retVal; + + if (asString) { + retVal = Bytes.toBytes(record.getAsString(field)); + } else { + switch (fieldType) { + case BOOLEAN: + retVal = Bytes.toBytes(record.getAsBoolean(field)); + break; + case CHAR: + retVal = Bytes.toBytes(record.getAsString(field)); + break; + case DOUBLE: + retVal = Bytes.toBytes(record.getAsDouble(field)); + break; + case FLOAT: + retVal = Bytes.toBytes(record.getAsFloat(field)); + break; + case INT: + retVal = Bytes.toBytes(record.getAsInt(field)); + break; + case LONG: + retVal = Bytes.toBytes(record.getAsLong(field)); + break; + default: + retVal = Bytes.toBytes(record.getAsString(field)); + } + } + + return retVal; + } + + protected List<PutFlowFile> createPuts(ProcessSession session, ProcessContext context, FlowFile flowFile, RecordReaderFactory recordParserFactory) { — End diff – I think we should try to avoid reading the entire flow file into memory if possible... With PutHBaseJSON we had one JSON message per flow file, so the # of messages in memory at one time would be equal to the batch size. With the record oriented processors, the whole idea is to keep a lot of records together for efficiency, so we might have millions of records in a single flow file which could lead to memory issues if they were all read into memory. A possible approach to deal with this could be the following... We could change the meaning of Batch Size in this processor, and instead of it being the number of flow files to grab, it could be the number of records to send to HBase in one call. So the processor grabs one flow file, creates a record reader and starts reading records up to batch size, sends the batch to HBase, and then starts a new batch. We can also keep track of the index of the last successful record, and if an error occurs at any point we then route the flow file to failure and add an attribute like "record.successful.index.<processor-id>". If the processor receives a flow file with this attribute already populated, then it will pick up from this index when sending records. Thoughts?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user MikeThomsen commented on the issue:

          https://github.com/apache/nifi/pull/1961

          @bbende I ran this against a large body of our test data, and it seemed to work just fine.

          Show
          githubbot ASF GitHub Bot added a comment - Github user MikeThomsen commented on the issue: https://github.com/apache/nifi/pull/1961 @bbende I ran this against a large body of our test data, and it seemed to work just fine.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user bbende commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1961#discussion_r126214244

          — Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java —
          @@ -0,0 +1,309 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.hbase;
          +
          +import org.apache.nifi.annotation.behavior.EventDriven;
          +import org.apache.nifi.annotation.behavior.InputRequirement;
          +import org.apache.nifi.annotation.behavior.SupportsBatching;
          +import org.apache.nifi.annotation.documentation.CapabilityDescription;
          +import org.apache.nifi.annotation.documentation.Tags;
          +import org.apache.nifi.components.AllowableValue;
          +import org.apache.nifi.components.PropertyDescriptor;
          +import org.apache.nifi.flowfile.FlowFile;
          +import org.apache.nifi.hbase.put.PutColumn;
          +import org.apache.nifi.hbase.put.PutFlowFile;
          +import org.apache.nifi.processor.ProcessContext;
          +import org.apache.nifi.processor.ProcessSession;
          +import org.apache.nifi.processor.Relationship;
          +import org.apache.nifi.processor.exception.ProcessException;
          +import org.apache.nifi.processor.util.StandardValidators;
          +import org.apache.nifi.serialization.RecordReader;
          +import org.apache.nifi.serialization.RecordReaderFactory;
          +import org.apache.nifi.serialization.record.Record;
          +import org.apache.nifi.serialization.record.RecordFieldType;
          +import org.apache.nifi.serialization.record.RecordSchema;
          +
          +import java.io.IOException;
          +import java.util.ArrayList;
          +import java.util.Arrays;
          +import java.util.HashSet;
          +import java.util.List;
          +import java.util.Set;
          +
          +@EventDriven
          +@SupportsBatching
          +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
          +@Tags(

          {"hadoop", "hbase", "put", "record"}

          )
          +@CapabilityDescription("Adds rows to HBase based on the contents of a flowfile using a configured record reader.")
          +public class PutHBaseRecord extends AbstractPutHBase {
          +
          + protected static final PropertyDescriptor ROW_FIELD_NAME = new PropertyDescriptor.Builder()
          + .name("Row Identifier Field Name")
          + .description("Specifies the name of a record field whose value should be used as the row id for the given record.")
          + .expressionLanguageSupported(true)
          + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
          + .build();
          +
          + protected static final String FAIL_VALUE = "Fail";
          + protected static final String WARN_VALUE = "Warn";
          + protected static final String IGNORE_VALUE = "Ignore";
          + protected static final String TEXT_VALUE = "Text";
          +
          + protected static final AllowableValue COMPLEX_FIELD_FAIL = new AllowableValue(FAIL_VALUE, FAIL_VALUE, "Route entire FlowFile to failure if any elements contain complex values.");
          + protected static final AllowableValue COMPLEX_FIELD_WARN = new AllowableValue(WARN_VALUE, WARN_VALUE, "Provide a warning and do not include field in row sent to HBase.");
          + protected static final AllowableValue COMPLEX_FIELD_IGNORE = new AllowableValue(IGNORE_VALUE, IGNORE_VALUE, "Silently ignore and do not include in row sent to HBase.");
          + protected static final AllowableValue COMPLEX_FIELD_TEXT = new AllowableValue(TEXT_VALUE, TEXT_VALUE, "Use the string representation of the complex field as the value of the given column.");
          +
          + static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder()
          + .name("record-reader")
          + .displayName("Record Reader")
          + .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema")
          + .identifiesControllerService(RecordReaderFactory.class)
          + .required(true)
          + .build();
          +
          + protected static final PropertyDescriptor COMPLEX_FIELD_STRATEGY = new PropertyDescriptor.Builder()
          + .name("Complex Field Strategy")
          + .description("Indicates how to handle complex fields, i.e. fields that do not have a single text value.")
          + .expressionLanguageSupported(false)
          + .required(true)
          + .allowableValues(COMPLEX_FIELD_FAIL, COMPLEX_FIELD_WARN, COMPLEX_FIELD_IGNORE, COMPLEX_FIELD_TEXT)
          + .defaultValue(COMPLEX_FIELD_TEXT.getValue())
          + .build();
          +
          +
          + protected static final AllowableValue FIELD_ENCODING_STRING = new AllowableValue(STRING_ENCODING_VALUE, STRING_ENCODING_VALUE,
          + "Stores the value of each field as a UTF-8 String.");
          + protected static final AllowableValue FIELD_ENCODING_BYTES = new AllowableValue(BYTES_ENCODING_VALUE, BYTES_ENCODING_VALUE,
          + "Stores the value of each field as the byte representation of the type derived from the record.");
          +
          + protected static final PropertyDescriptor FIELD_ENCODING_STRATEGY = new PropertyDescriptor.Builder()
          + .name("Field Encoding Strategy")
          + .description(("Indicates how to store the value of each field in HBase. The default behavior is to convert each value from the " +
          + "record to a String, and store the UTF-8 bytes. Choosing Bytes will interpret the type of each field from " +
          + "the record, and convert the value to the byte representation of that type, meaning an integer will be stored as the " +
          + "byte representation of that integer."))
          + .required(true)
          + .allowableValues(FIELD_ENCODING_STRING, FIELD_ENCODING_BYTES)
          + .defaultValue(FIELD_ENCODING_STRING.getValue())
          + .build();
          +
          + protected static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
          + .name("Batch Size")
          + .description("The maximum number of records to be sent to HBase at any one time from the record set.")
          + .required(true)
          + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
          + .defaultValue("1000")
          + .build();
          +
          + @Override
          + public final List<PropertyDescriptor> getSupportedPropertyDescriptors()

          { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(RECORD_READER_FACTORY); + properties.add(HBASE_CLIENT_SERVICE); + properties.add(TABLE_NAME); + properties.add(ROW_ID); + properties.add(ROW_FIELD_NAME); + properties.add(ROW_ID_ENCODING_STRATEGY); + properties.add(COLUMN_FAMILY); + properties.add(BATCH_SIZE); + properties.add(COMPLEX_FIELD_STRATEGY); + properties.add(FIELD_ENCODING_STRATEGY); + return properties; + }

          +
          + @Override
          + public Set<Relationship> getRelationships()

          { + final Set<Relationship> rels = new HashSet<>(); + rels.add(REL_SUCCESS); + rels.add(REL_FAILURE); + return rels; + }

          +
          + private int addBatch(String tableName, List<PutFlowFile> flowFiles) throws IOException {
          + int columns = 0;
          + clientService.put(tableName, flowFiles);
          + for (PutFlowFile put : flowFiles)

          { + columns += put.getColumns().size(); + }

          +
          + return columns;
          + }
          +
          + private RecordReaderFactory recordParserFactory;
          + @Override
          + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
          + final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
          + FlowFile flowFile = session.get();
          + if (flowFile == null)

          { + return; + }

          +
          + final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER_FACTORY)
          + .asControllerService(RecordReaderFactory.class);
          + List<PutFlowFile> flowFiles = new ArrayList<>();
          + final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
          + final String rowFieldName = context.getProperty(ROW_FIELD_NAME).evaluateAttributeExpressions(flowFile).getValue();
          + final String columnFamily = context.getProperty(COLUMN_FAMILY).evaluateAttributeExpressions(flowFile).getValue();
          + final String fieldEncodingStrategy = context.getProperty(FIELD_ENCODING_STRATEGY).getValue();
          + final long start = System.nanoTime();
          + int index = 0;
          + int columns = 0;
          + boolean failed = false;
          + String startIndexStr = flowFile.getAttribute("restart.index");
          + int startIndex = -1;
          + if (startIndexStr != null)

          { + startIndex = Integer.parseInt(startIndexStr); + }

          +
          + PutFlowFile first = null;
          + PutFlowFile last = null;
          + try (RecordReader reader = recordParserFactory.createRecordReader(flowFile, session.read(flowFile), getLogger())) {
          + Record record;
          + if (startIndex >= 0) {
          + while ( index++ < startIndex && (reader.nextRecord()) != null) {}
          + }
          +
          + while ((record = reader.nextRecord()) != null) {
          + PutFlowFile putFlowFile = createPut(context, record, reader.getSchema(), flowFile, rowFieldName, columnFamily, fieldEncodingStrategy);
          + flowFiles.add(putFlowFile);
          + if (index == 0)

          { + first = putFlowFile; + }

          + index++;
          +
          + if (flowFiles.size() == batchSize)

          { + columns += addBatch(tableName, flowFiles); + last = flowFiles.get(flowFiles.size() - 1); + flowFiles = new ArrayList<>(); + }

          + }
          + if (flowFiles.size() > 0)

          { + columns += addBatch(tableName, flowFiles); + last = flowFiles.get(flowFiles.size() - 1); + }

          + } catch (Exception ex)

          { + getLogger().error("Failed to put records to HBase.", ex); + failed = true; + }

          +
          + if (!failed)

          { + long sendMillis = System.nanoTime() - start; + List<String> urls = Arrays.asList(getTransitUri(first), getTransitUri(last)); + final String details = String.format("Put %d cells to HBase.", columns); + session.getProvenanceReporter().send(flowFile, urls.get(0), details, sendMillis); + session.getProvenanceReporter().send(flowFile, urls.get(1), details, sendMillis); + session.transfer(flowFile, REL_SUCCESS); + }

          else

          { + String restartIndex = Integer.toString(index - flowFiles.size()); + flowFile = session.putAttribute(flowFile, "restart.index", restartIndex); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + }

          +
          +
          +
          + session.commit();
          + }
          +
          +
          + @Override
          + protected PutFlowFile createPut(ProcessSession session, ProcessContext context, FlowFile flowFile)

          { + return null; + }

          +
          + protected byte[] asBytes(String field, Object input) {
          — End diff –

          Looks like this method might not be used anymore and could be removed?

          Show
          githubbot ASF GitHub Bot added a comment - Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/1961#discussion_r126214244 — Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java — @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.hbase.put.PutColumn; +import org.apache.nifi.hbase.put.PutFlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +@EventDriven +@SupportsBatching +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags( {"hadoop", "hbase", "put", "record"} ) +@CapabilityDescription("Adds rows to HBase based on the contents of a flowfile using a configured record reader.") +public class PutHBaseRecord extends AbstractPutHBase { + + protected static final PropertyDescriptor ROW_FIELD_NAME = new PropertyDescriptor.Builder() + .name("Row Identifier Field Name") + .description("Specifies the name of a record field whose value should be used as the row id for the given record.") + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + protected static final String FAIL_VALUE = "Fail"; + protected static final String WARN_VALUE = "Warn"; + protected static final String IGNORE_VALUE = "Ignore"; + protected static final String TEXT_VALUE = "Text"; + + protected static final AllowableValue COMPLEX_FIELD_FAIL = new AllowableValue(FAIL_VALUE, FAIL_VALUE, "Route entire FlowFile to failure if any elements contain complex values."); + protected static final AllowableValue COMPLEX_FIELD_WARN = new AllowableValue(WARN_VALUE, WARN_VALUE, "Provide a warning and do not include field in row sent to HBase."); + protected static final AllowableValue COMPLEX_FIELD_IGNORE = new AllowableValue(IGNORE_VALUE, IGNORE_VALUE, "Silently ignore and do not include in row sent to HBase."); + protected static final AllowableValue COMPLEX_FIELD_TEXT = new AllowableValue(TEXT_VALUE, TEXT_VALUE, "Use the string representation of the complex field as the value of the given column."); + + static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder() + .name("record-reader") + .displayName("Record Reader") + .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema") + .identifiesControllerService(RecordReaderFactory.class) + .required(true) + .build(); + + protected static final PropertyDescriptor COMPLEX_FIELD_STRATEGY = new PropertyDescriptor.Builder() + .name("Complex Field Strategy") + .description("Indicates how to handle complex fields, i.e. fields that do not have a single text value.") + .expressionLanguageSupported(false) + .required(true) + .allowableValues(COMPLEX_FIELD_FAIL, COMPLEX_FIELD_WARN, COMPLEX_FIELD_IGNORE, COMPLEX_FIELD_TEXT) + .defaultValue(COMPLEX_FIELD_TEXT.getValue()) + .build(); + + + protected static final AllowableValue FIELD_ENCODING_STRING = new AllowableValue(STRING_ENCODING_VALUE, STRING_ENCODING_VALUE, + "Stores the value of each field as a UTF-8 String."); + protected static final AllowableValue FIELD_ENCODING_BYTES = new AllowableValue(BYTES_ENCODING_VALUE, BYTES_ENCODING_VALUE, + "Stores the value of each field as the byte representation of the type derived from the record."); + + protected static final PropertyDescriptor FIELD_ENCODING_STRATEGY = new PropertyDescriptor.Builder() + .name("Field Encoding Strategy") + .description(("Indicates how to store the value of each field in HBase. The default behavior is to convert each value from the " + + "record to a String, and store the UTF-8 bytes. Choosing Bytes will interpret the type of each field from " + + "the record, and convert the value to the byte representation of that type, meaning an integer will be stored as the " + + "byte representation of that integer.")) + .required(true) + .allowableValues(FIELD_ENCODING_STRING, FIELD_ENCODING_BYTES) + .defaultValue(FIELD_ENCODING_STRING.getValue()) + .build(); + + protected static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .name("Batch Size") + .description("The maximum number of records to be sent to HBase at any one time from the record set.") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("1000") + .build(); + + @Override + public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(RECORD_READER_FACTORY); + properties.add(HBASE_CLIENT_SERVICE); + properties.add(TABLE_NAME); + properties.add(ROW_ID); + properties.add(ROW_FIELD_NAME); + properties.add(ROW_ID_ENCODING_STRATEGY); + properties.add(COLUMN_FAMILY); + properties.add(BATCH_SIZE); + properties.add(COMPLEX_FIELD_STRATEGY); + properties.add(FIELD_ENCODING_STRATEGY); + return properties; + } + + @Override + public Set<Relationship> getRelationships() { + final Set<Relationship> rels = new HashSet<>(); + rels.add(REL_SUCCESS); + rels.add(REL_FAILURE); + return rels; + } + + private int addBatch(String tableName, List<PutFlowFile> flowFiles) throws IOException { + int columns = 0; + clientService.put(tableName, flowFiles); + for (PutFlowFile put : flowFiles) { + columns += put.getColumns().size(); + } + + return columns; + } + + private RecordReaderFactory recordParserFactory; + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER_FACTORY) + .asControllerService(RecordReaderFactory.class); + List<PutFlowFile> flowFiles = new ArrayList<>(); + final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); + final String rowFieldName = context.getProperty(ROW_FIELD_NAME).evaluateAttributeExpressions(flowFile).getValue(); + final String columnFamily = context.getProperty(COLUMN_FAMILY).evaluateAttributeExpressions(flowFile).getValue(); + final String fieldEncodingStrategy = context.getProperty(FIELD_ENCODING_STRATEGY).getValue(); + final long start = System.nanoTime(); + int index = 0; + int columns = 0; + boolean failed = false; + String startIndexStr = flowFile.getAttribute("restart.index"); + int startIndex = -1; + if (startIndexStr != null) { + startIndex = Integer.parseInt(startIndexStr); + } + + PutFlowFile first = null; + PutFlowFile last = null; + try (RecordReader reader = recordParserFactory.createRecordReader(flowFile, session.read(flowFile), getLogger())) { + Record record; + if (startIndex >= 0) { + while ( index++ < startIndex && (reader.nextRecord()) != null) {} + } + + while ((record = reader.nextRecord()) != null) { + PutFlowFile putFlowFile = createPut(context, record, reader.getSchema(), flowFile, rowFieldName, columnFamily, fieldEncodingStrategy); + flowFiles.add(putFlowFile); + if (index == 0) { + first = putFlowFile; + } + index++; + + if (flowFiles.size() == batchSize) { + columns += addBatch(tableName, flowFiles); + last = flowFiles.get(flowFiles.size() - 1); + flowFiles = new ArrayList<>(); + } + } + if (flowFiles.size() > 0) { + columns += addBatch(tableName, flowFiles); + last = flowFiles.get(flowFiles.size() - 1); + } + } catch (Exception ex) { + getLogger().error("Failed to put records to HBase.", ex); + failed = true; + } + + if (!failed) { + long sendMillis = System.nanoTime() - start; + List<String> urls = Arrays.asList(getTransitUri(first), getTransitUri(last)); + final String details = String.format("Put %d cells to HBase.", columns); + session.getProvenanceReporter().send(flowFile, urls.get(0), details, sendMillis); + session.getProvenanceReporter().send(flowFile, urls.get(1), details, sendMillis); + session.transfer(flowFile, REL_SUCCESS); + } else { + String restartIndex = Integer.toString(index - flowFiles.size()); + flowFile = session.putAttribute(flowFile, "restart.index", restartIndex); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + } + + + + session.commit(); + } + + + @Override + protected PutFlowFile createPut(ProcessSession session, ProcessContext context, FlowFile flowFile) { + return null; + } + + protected byte[] asBytes(String field, Object input) { — End diff – Looks like this method might not be used anymore and could be removed?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user bbende commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1961#discussion_r126216880

          — Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java —
          @@ -0,0 +1,309 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.hbase;
          +
          +import org.apache.nifi.annotation.behavior.EventDriven;
          +import org.apache.nifi.annotation.behavior.InputRequirement;
          +import org.apache.nifi.annotation.behavior.SupportsBatching;
          +import org.apache.nifi.annotation.documentation.CapabilityDescription;
          +import org.apache.nifi.annotation.documentation.Tags;
          +import org.apache.nifi.components.AllowableValue;
          +import org.apache.nifi.components.PropertyDescriptor;
          +import org.apache.nifi.flowfile.FlowFile;
          +import org.apache.nifi.hbase.put.PutColumn;
          +import org.apache.nifi.hbase.put.PutFlowFile;
          +import org.apache.nifi.processor.ProcessContext;
          +import org.apache.nifi.processor.ProcessSession;
          +import org.apache.nifi.processor.Relationship;
          +import org.apache.nifi.processor.exception.ProcessException;
          +import org.apache.nifi.processor.util.StandardValidators;
          +import org.apache.nifi.serialization.RecordReader;
          +import org.apache.nifi.serialization.RecordReaderFactory;
          +import org.apache.nifi.serialization.record.Record;
          +import org.apache.nifi.serialization.record.RecordFieldType;
          +import org.apache.nifi.serialization.record.RecordSchema;
          +
          +import java.io.IOException;
          +import java.util.ArrayList;
          +import java.util.Arrays;
          +import java.util.HashSet;
          +import java.util.List;
          +import java.util.Set;
          +
          +@EventDriven
          +@SupportsBatching
          +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
          +@Tags(

          {"hadoop", "hbase", "put", "record"}

          )
          +@CapabilityDescription("Adds rows to HBase based on the contents of a flowfile using a configured record reader.")
          +public class PutHBaseRecord extends AbstractPutHBase {
          +
          + protected static final PropertyDescriptor ROW_FIELD_NAME = new PropertyDescriptor.Builder()
          + .name("Row Identifier Field Name")
          + .description("Specifies the name of a record field whose value should be used as the row id for the given record.")
          + .expressionLanguageSupported(true)
          + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
          + .build();
          +
          + protected static final String FAIL_VALUE = "Fail";
          + protected static final String WARN_VALUE = "Warn";
          + protected static final String IGNORE_VALUE = "Ignore";
          + protected static final String TEXT_VALUE = "Text";
          +
          + protected static final AllowableValue COMPLEX_FIELD_FAIL = new AllowableValue(FAIL_VALUE, FAIL_VALUE, "Route entire FlowFile to failure if any elements contain complex values.");
          + protected static final AllowableValue COMPLEX_FIELD_WARN = new AllowableValue(WARN_VALUE, WARN_VALUE, "Provide a warning and do not include field in row sent to HBase.");
          + protected static final AllowableValue COMPLEX_FIELD_IGNORE = new AllowableValue(IGNORE_VALUE, IGNORE_VALUE, "Silently ignore and do not include in row sent to HBase.");
          + protected static final AllowableValue COMPLEX_FIELD_TEXT = new AllowableValue(TEXT_VALUE, TEXT_VALUE, "Use the string representation of the complex field as the value of the given column.");
          +
          + static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder()
          + .name("record-reader")
          + .displayName("Record Reader")
          + .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema")
          + .identifiesControllerService(RecordReaderFactory.class)
          + .required(true)
          + .build();
          +
          + protected static final PropertyDescriptor COMPLEX_FIELD_STRATEGY = new PropertyDescriptor.Builder()
          + .name("Complex Field Strategy")
          + .description("Indicates how to handle complex fields, i.e. fields that do not have a single text value.")
          + .expressionLanguageSupported(false)
          + .required(true)
          + .allowableValues(COMPLEX_FIELD_FAIL, COMPLEX_FIELD_WARN, COMPLEX_FIELD_IGNORE, COMPLEX_FIELD_TEXT)
          + .defaultValue(COMPLEX_FIELD_TEXT.getValue())
          + .build();
          +
          +
          + protected static final AllowableValue FIELD_ENCODING_STRING = new AllowableValue(STRING_ENCODING_VALUE, STRING_ENCODING_VALUE,
          + "Stores the value of each field as a UTF-8 String.");
          + protected static final AllowableValue FIELD_ENCODING_BYTES = new AllowableValue(BYTES_ENCODING_VALUE, BYTES_ENCODING_VALUE,
          + "Stores the value of each field as the byte representation of the type derived from the record.");
          +
          + protected static final PropertyDescriptor FIELD_ENCODING_STRATEGY = new PropertyDescriptor.Builder()
          + .name("Field Encoding Strategy")
          + .description(("Indicates how to store the value of each field in HBase. The default behavior is to convert each value from the " +
          + "record to a String, and store the UTF-8 bytes. Choosing Bytes will interpret the type of each field from " +
          + "the record, and convert the value to the byte representation of that type, meaning an integer will be stored as the " +
          + "byte representation of that integer."))
          + .required(true)
          + .allowableValues(FIELD_ENCODING_STRING, FIELD_ENCODING_BYTES)
          + .defaultValue(FIELD_ENCODING_STRING.getValue())
          + .build();
          +
          + protected static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
          + .name("Batch Size")
          + .description("The maximum number of records to be sent to HBase at any one time from the record set.")
          + .required(true)
          + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
          + .defaultValue("1000")
          + .build();
          +
          + @Override
          + public final List<PropertyDescriptor> getSupportedPropertyDescriptors()

          { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(RECORD_READER_FACTORY); + properties.add(HBASE_CLIENT_SERVICE); + properties.add(TABLE_NAME); + properties.add(ROW_ID); + properties.add(ROW_FIELD_NAME); + properties.add(ROW_ID_ENCODING_STRATEGY); + properties.add(COLUMN_FAMILY); + properties.add(BATCH_SIZE); + properties.add(COMPLEX_FIELD_STRATEGY); + properties.add(FIELD_ENCODING_STRATEGY); + return properties; + }

          +
          + @Override
          + public Set<Relationship> getRelationships()

          { + final Set<Relationship> rels = new HashSet<>(); + rels.add(REL_SUCCESS); + rels.add(REL_FAILURE); + return rels; + }

          +
          + private int addBatch(String tableName, List<PutFlowFile> flowFiles) throws IOException {
          + int columns = 0;
          + clientService.put(tableName, flowFiles);
          + for (PutFlowFile put : flowFiles)

          { + columns += put.getColumns().size(); + }

          +
          + return columns;
          + }
          +
          + private RecordReaderFactory recordParserFactory;
          + @Override
          + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
          + final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
          + FlowFile flowFile = session.get();
          + if (flowFile == null)

          { + return; + }

          +
          + final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER_FACTORY)
          + .asControllerService(RecordReaderFactory.class);
          + List<PutFlowFile> flowFiles = new ArrayList<>();
          + final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
          + final String rowFieldName = context.getProperty(ROW_FIELD_NAME).evaluateAttributeExpressions(flowFile).getValue();
          + final String columnFamily = context.getProperty(COLUMN_FAMILY).evaluateAttributeExpressions(flowFile).getValue();
          + final String fieldEncodingStrategy = context.getProperty(FIELD_ENCODING_STRATEGY).getValue();
          + final long start = System.nanoTime();
          + int index = 0;
          + int columns = 0;
          + boolean failed = false;
          + String startIndexStr = flowFile.getAttribute("restart.index");
          + int startIndex = -1;
          + if (startIndexStr != null)

          { + startIndex = Integer.parseInt(startIndexStr); + }

          +
          + PutFlowFile first = null;
          + PutFlowFile last = null;
          + try (RecordReader reader = recordParserFactory.createRecordReader(flowFile, session.read(flowFile), getLogger())) {
          + Record record;
          + if (startIndex >= 0) {
          + while ( index++ < startIndex && (reader.nextRecord()) != null) {}
          + }
          +
          + while ((record = reader.nextRecord()) != null) {
          + PutFlowFile putFlowFile = createPut(context, record, reader.getSchema(), flowFile, rowFieldName, columnFamily, fieldEncodingStrategy);
          + flowFiles.add(putFlowFile);
          + if (index == 0)

          { + first = putFlowFile; + }

          + index++;
          +
          + if (flowFiles.size() == batchSize)

          { + columns += addBatch(tableName, flowFiles); + last = flowFiles.get(flowFiles.size() - 1); + flowFiles = new ArrayList<>(); + }

          + }
          + if (flowFiles.size() > 0)

          { + columns += addBatch(tableName, flowFiles); + last = flowFiles.get(flowFiles.size() - 1); + }

          + } catch (Exception ex)

          { + getLogger().error("Failed to put records to HBase.", ex); + failed = true; + }

          +
          + if (!failed) {
          + long sendMillis = System.nanoTime() - start;
          + List<String> urls = Arrays.asList(getTransitUri(first), getTransitUri(last));
          + final String details = String.format("Put %d cells to HBase.", columns);
          + session.getProvenanceReporter().send(flowFile, urls.get(0), details, sendMillis);
          + session.getProvenanceReporter().send(flowFile, urls.get(1), details, sendMillis);
          + session.transfer(flowFile, REL_SUCCESS);
          — End diff –

          We should remove the retry.index attribute before transferring to success since the retry would be done if it made it this far.

          Show
          githubbot ASF GitHub Bot added a comment - Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/1961#discussion_r126216880 — Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java — @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.hbase.put.PutColumn; +import org.apache.nifi.hbase.put.PutFlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +@EventDriven +@SupportsBatching +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags( {"hadoop", "hbase", "put", "record"} ) +@CapabilityDescription("Adds rows to HBase based on the contents of a flowfile using a configured record reader.") +public class PutHBaseRecord extends AbstractPutHBase { + + protected static final PropertyDescriptor ROW_FIELD_NAME = new PropertyDescriptor.Builder() + .name("Row Identifier Field Name") + .description("Specifies the name of a record field whose value should be used as the row id for the given record.") + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + protected static final String FAIL_VALUE = "Fail"; + protected static final String WARN_VALUE = "Warn"; + protected static final String IGNORE_VALUE = "Ignore"; + protected static final String TEXT_VALUE = "Text"; + + protected static final AllowableValue COMPLEX_FIELD_FAIL = new AllowableValue(FAIL_VALUE, FAIL_VALUE, "Route entire FlowFile to failure if any elements contain complex values."); + protected static final AllowableValue COMPLEX_FIELD_WARN = new AllowableValue(WARN_VALUE, WARN_VALUE, "Provide a warning and do not include field in row sent to HBase."); + protected static final AllowableValue COMPLEX_FIELD_IGNORE = new AllowableValue(IGNORE_VALUE, IGNORE_VALUE, "Silently ignore and do not include in row sent to HBase."); + protected static final AllowableValue COMPLEX_FIELD_TEXT = new AllowableValue(TEXT_VALUE, TEXT_VALUE, "Use the string representation of the complex field as the value of the given column."); + + static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder() + .name("record-reader") + .displayName("Record Reader") + .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema") + .identifiesControllerService(RecordReaderFactory.class) + .required(true) + .build(); + + protected static final PropertyDescriptor COMPLEX_FIELD_STRATEGY = new PropertyDescriptor.Builder() + .name("Complex Field Strategy") + .description("Indicates how to handle complex fields, i.e. fields that do not have a single text value.") + .expressionLanguageSupported(false) + .required(true) + .allowableValues(COMPLEX_FIELD_FAIL, COMPLEX_FIELD_WARN, COMPLEX_FIELD_IGNORE, COMPLEX_FIELD_TEXT) + .defaultValue(COMPLEX_FIELD_TEXT.getValue()) + .build(); + + + protected static final AllowableValue FIELD_ENCODING_STRING = new AllowableValue(STRING_ENCODING_VALUE, STRING_ENCODING_VALUE, + "Stores the value of each field as a UTF-8 String."); + protected static final AllowableValue FIELD_ENCODING_BYTES = new AllowableValue(BYTES_ENCODING_VALUE, BYTES_ENCODING_VALUE, + "Stores the value of each field as the byte representation of the type derived from the record."); + + protected static final PropertyDescriptor FIELD_ENCODING_STRATEGY = new PropertyDescriptor.Builder() + .name("Field Encoding Strategy") + .description(("Indicates how to store the value of each field in HBase. The default behavior is to convert each value from the " + + "record to a String, and store the UTF-8 bytes. Choosing Bytes will interpret the type of each field from " + + "the record, and convert the value to the byte representation of that type, meaning an integer will be stored as the " + + "byte representation of that integer.")) + .required(true) + .allowableValues(FIELD_ENCODING_STRING, FIELD_ENCODING_BYTES) + .defaultValue(FIELD_ENCODING_STRING.getValue()) + .build(); + + protected static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .name("Batch Size") + .description("The maximum number of records to be sent to HBase at any one time from the record set.") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("1000") + .build(); + + @Override + public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(RECORD_READER_FACTORY); + properties.add(HBASE_CLIENT_SERVICE); + properties.add(TABLE_NAME); + properties.add(ROW_ID); + properties.add(ROW_FIELD_NAME); + properties.add(ROW_ID_ENCODING_STRATEGY); + properties.add(COLUMN_FAMILY); + properties.add(BATCH_SIZE); + properties.add(COMPLEX_FIELD_STRATEGY); + properties.add(FIELD_ENCODING_STRATEGY); + return properties; + } + + @Override + public Set<Relationship> getRelationships() { + final Set<Relationship> rels = new HashSet<>(); + rels.add(REL_SUCCESS); + rels.add(REL_FAILURE); + return rels; + } + + private int addBatch(String tableName, List<PutFlowFile> flowFiles) throws IOException { + int columns = 0; + clientService.put(tableName, flowFiles); + for (PutFlowFile put : flowFiles) { + columns += put.getColumns().size(); + } + + return columns; + } + + private RecordReaderFactory recordParserFactory; + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER_FACTORY) + .asControllerService(RecordReaderFactory.class); + List<PutFlowFile> flowFiles = new ArrayList<>(); + final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); + final String rowFieldName = context.getProperty(ROW_FIELD_NAME).evaluateAttributeExpressions(flowFile).getValue(); + final String columnFamily = context.getProperty(COLUMN_FAMILY).evaluateAttributeExpressions(flowFile).getValue(); + final String fieldEncodingStrategy = context.getProperty(FIELD_ENCODING_STRATEGY).getValue(); + final long start = System.nanoTime(); + int index = 0; + int columns = 0; + boolean failed = false; + String startIndexStr = flowFile.getAttribute("restart.index"); + int startIndex = -1; + if (startIndexStr != null) { + startIndex = Integer.parseInt(startIndexStr); + } + + PutFlowFile first = null; + PutFlowFile last = null; + try (RecordReader reader = recordParserFactory.createRecordReader(flowFile, session.read(flowFile), getLogger())) { + Record record; + if (startIndex >= 0) { + while ( index++ < startIndex && (reader.nextRecord()) != null) {} + } + + while ((record = reader.nextRecord()) != null) { + PutFlowFile putFlowFile = createPut(context, record, reader.getSchema(), flowFile, rowFieldName, columnFamily, fieldEncodingStrategy); + flowFiles.add(putFlowFile); + if (index == 0) { + first = putFlowFile; + } + index++; + + if (flowFiles.size() == batchSize) { + columns += addBatch(tableName, flowFiles); + last = flowFiles.get(flowFiles.size() - 1); + flowFiles = new ArrayList<>(); + } + } + if (flowFiles.size() > 0) { + columns += addBatch(tableName, flowFiles); + last = flowFiles.get(flowFiles.size() - 1); + } + } catch (Exception ex) { + getLogger().error("Failed to put records to HBase.", ex); + failed = true; + } + + if (!failed) { + long sendMillis = System.nanoTime() - start; + List<String> urls = Arrays.asList(getTransitUri(first), getTransitUri(last)); + final String details = String.format("Put %d cells to HBase.", columns); + session.getProvenanceReporter().send(flowFile, urls.get(0), details, sendMillis); + session.getProvenanceReporter().send(flowFile, urls.get(1), details, sendMillis); + session.transfer(flowFile, REL_SUCCESS); — End diff – We should remove the retry.index attribute before transferring to success since the retry would be done if it made it this far.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user bbende commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1961#discussion_r126218471

          — Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java —
          @@ -0,0 +1,309 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.hbase;
          +
          +import org.apache.nifi.annotation.behavior.EventDriven;
          +import org.apache.nifi.annotation.behavior.InputRequirement;
          +import org.apache.nifi.annotation.behavior.SupportsBatching;
          +import org.apache.nifi.annotation.documentation.CapabilityDescription;
          +import org.apache.nifi.annotation.documentation.Tags;
          +import org.apache.nifi.components.AllowableValue;
          +import org.apache.nifi.components.PropertyDescriptor;
          +import org.apache.nifi.flowfile.FlowFile;
          +import org.apache.nifi.hbase.put.PutColumn;
          +import org.apache.nifi.hbase.put.PutFlowFile;
          +import org.apache.nifi.processor.ProcessContext;
          +import org.apache.nifi.processor.ProcessSession;
          +import org.apache.nifi.processor.Relationship;
          +import org.apache.nifi.processor.exception.ProcessException;
          +import org.apache.nifi.processor.util.StandardValidators;
          +import org.apache.nifi.serialization.RecordReader;
          +import org.apache.nifi.serialization.RecordReaderFactory;
          +import org.apache.nifi.serialization.record.Record;
          +import org.apache.nifi.serialization.record.RecordFieldType;
          +import org.apache.nifi.serialization.record.RecordSchema;
          +
          +import java.io.IOException;
          +import java.util.ArrayList;
          +import java.util.Arrays;
          +import java.util.HashSet;
          +import java.util.List;
          +import java.util.Set;
          +
          +@EventDriven
          +@SupportsBatching
          +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
          +@Tags(

          {"hadoop", "hbase", "put", "record"}

          )
          +@CapabilityDescription("Adds rows to HBase based on the contents of a flowfile using a configured record reader.")
          +public class PutHBaseRecord extends AbstractPutHBase {
          +
          + protected static final PropertyDescriptor ROW_FIELD_NAME = new PropertyDescriptor.Builder()
          + .name("Row Identifier Field Name")
          + .description("Specifies the name of a record field whose value should be used as the row id for the given record.")
          + .expressionLanguageSupported(true)
          + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
          + .build();
          +
          + protected static final String FAIL_VALUE = "Fail";
          + protected static final String WARN_VALUE = "Warn";
          + protected static final String IGNORE_VALUE = "Ignore";
          + protected static final String TEXT_VALUE = "Text";
          +
          + protected static final AllowableValue COMPLEX_FIELD_FAIL = new AllowableValue(FAIL_VALUE, FAIL_VALUE, "Route entire FlowFile to failure if any elements contain complex values.");
          + protected static final AllowableValue COMPLEX_FIELD_WARN = new AllowableValue(WARN_VALUE, WARN_VALUE, "Provide a warning and do not include field in row sent to HBase.");
          + protected static final AllowableValue COMPLEX_FIELD_IGNORE = new AllowableValue(IGNORE_VALUE, IGNORE_VALUE, "Silently ignore and do not include in row sent to HBase.");
          + protected static final AllowableValue COMPLEX_FIELD_TEXT = new AllowableValue(TEXT_VALUE, TEXT_VALUE, "Use the string representation of the complex field as the value of the given column.");
          +
          + static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder()
          + .name("record-reader")
          + .displayName("Record Reader")
          + .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema")
          + .identifiesControllerService(RecordReaderFactory.class)
          + .required(true)
          + .build();
          +
          + protected static final PropertyDescriptor COMPLEX_FIELD_STRATEGY = new PropertyDescriptor.Builder()
          + .name("Complex Field Strategy")
          + .description("Indicates how to handle complex fields, i.e. fields that do not have a single text value.")
          + .expressionLanguageSupported(false)
          + .required(true)
          + .allowableValues(COMPLEX_FIELD_FAIL, COMPLEX_FIELD_WARN, COMPLEX_FIELD_IGNORE, COMPLEX_FIELD_TEXT)
          + .defaultValue(COMPLEX_FIELD_TEXT.getValue())
          + .build();
          +
          +
          + protected static final AllowableValue FIELD_ENCODING_STRING = new AllowableValue(STRING_ENCODING_VALUE, STRING_ENCODING_VALUE,
          + "Stores the value of each field as a UTF-8 String.");
          + protected static final AllowableValue FIELD_ENCODING_BYTES = new AllowableValue(BYTES_ENCODING_VALUE, BYTES_ENCODING_VALUE,
          + "Stores the value of each field as the byte representation of the type derived from the record.");
          +
          + protected static final PropertyDescriptor FIELD_ENCODING_STRATEGY = new PropertyDescriptor.Builder()
          + .name("Field Encoding Strategy")
          + .description(("Indicates how to store the value of each field in HBase. The default behavior is to convert each value from the " +
          + "record to a String, and store the UTF-8 bytes. Choosing Bytes will interpret the type of each field from " +
          + "the record, and convert the value to the byte representation of that type, meaning an integer will be stored as the " +
          + "byte representation of that integer."))
          + .required(true)
          + .allowableValues(FIELD_ENCODING_STRING, FIELD_ENCODING_BYTES)
          + .defaultValue(FIELD_ENCODING_STRING.getValue())
          + .build();
          +
          + protected static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
          + .name("Batch Size")
          + .description("The maximum number of records to be sent to HBase at any one time from the record set.")
          + .required(true)
          + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
          + .defaultValue("1000")
          + .build();
          +
          + @Override
          + public final List<PropertyDescriptor> getSupportedPropertyDescriptors()

          { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(RECORD_READER_FACTORY); + properties.add(HBASE_CLIENT_SERVICE); + properties.add(TABLE_NAME); + properties.add(ROW_ID); + properties.add(ROW_FIELD_NAME); + properties.add(ROW_ID_ENCODING_STRATEGY); + properties.add(COLUMN_FAMILY); + properties.add(BATCH_SIZE); + properties.add(COMPLEX_FIELD_STRATEGY); + properties.add(FIELD_ENCODING_STRATEGY); + return properties; + }

          +
          + @Override
          + public Set<Relationship> getRelationships()

          { + final Set<Relationship> rels = new HashSet<>(); + rels.add(REL_SUCCESS); + rels.add(REL_FAILURE); + return rels; + }

          +
          + private int addBatch(String tableName, List<PutFlowFile> flowFiles) throws IOException {
          + int columns = 0;
          + clientService.put(tableName, flowFiles);
          + for (PutFlowFile put : flowFiles)

          { + columns += put.getColumns().size(); + }

          +
          + return columns;
          + }
          +
          + private RecordReaderFactory recordParserFactory;
          + @Override
          + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
          + final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
          + FlowFile flowFile = session.get();
          + if (flowFile == null)

          { + return; + }

          +
          + final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER_FACTORY)
          + .asControllerService(RecordReaderFactory.class);
          + List<PutFlowFile> flowFiles = new ArrayList<>();
          + final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
          + final String rowFieldName = context.getProperty(ROW_FIELD_NAME).evaluateAttributeExpressions(flowFile).getValue();
          + final String columnFamily = context.getProperty(COLUMN_FAMILY).evaluateAttributeExpressions(flowFile).getValue();
          + final String fieldEncodingStrategy = context.getProperty(FIELD_ENCODING_STRATEGY).getValue();
          + final long start = System.nanoTime();
          + int index = 0;
          + int columns = 0;
          + boolean failed = false;
          + String startIndexStr = flowFile.getAttribute("restart.index");
          + int startIndex = -1;
          + if (startIndexStr != null)

          { + startIndex = Integer.parseInt(startIndexStr); + }

          +
          + PutFlowFile first = null;
          + PutFlowFile last = null;
          + try (RecordReader reader = recordParserFactory.createRecordReader(flowFile, session.read(flowFile), getLogger())) {
          + Record record;
          + if (startIndex >= 0) {
          + while ( index++ < startIndex && (reader.nextRecord()) != null) {}
          + }
          +
          + while ((record = reader.nextRecord()) != null) {
          + PutFlowFile putFlowFile = createPut(context, record, reader.getSchema(), flowFile, rowFieldName, columnFamily, fieldEncodingStrategy);
          + flowFiles.add(putFlowFile);
          + if (index == 0)

          { + first = putFlowFile; + }

          + index++;
          +
          + if (flowFiles.size() == batchSize)

          { + columns += addBatch(tableName, flowFiles); + last = flowFiles.get(flowFiles.size() - 1); + flowFiles = new ArrayList<>(); + }

          + }
          + if (flowFiles.size() > 0)

          { + columns += addBatch(tableName, flowFiles); + last = flowFiles.get(flowFiles.size() - 1); + }

          + } catch (Exception ex)

          { + getLogger().error("Failed to put records to HBase.", ex); + failed = true; + }

          +
          + if (!failed)

          { + long sendMillis = System.nanoTime() - start; + List<String> urls = Arrays.asList(getTransitUri(first), getTransitUri(last)); + final String details = String.format("Put %d cells to HBase.", columns); + session.getProvenanceReporter().send(flowFile, urls.get(0), details, sendMillis); + session.getProvenanceReporter().send(flowFile, urls.get(1), details, sendMillis); + session.transfer(flowFile, REL_SUCCESS); + }

          else

          { + String restartIndex = Integer.toString(index - flowFiles.size()); + flowFile = session.putAttribute(flowFile, "restart.index", restartIndex); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + }

          +
          +
          +
          + session.commit();
          + }
          +
          +
          + @Override
          + protected PutFlowFile createPut(ProcessSession session, ProcessContext context, FlowFile flowFile)

          { + return null; + }

          +
          + protected byte[] asBytes(String field, Object input) {
          + byte[] retVal = null;
          +
          + if (input instanceof Number) {
          + if (input instanceof Float || input instanceof Double)

          { + retVal = clientService.toBytes(Double.parseDouble(input.toString())); + }

          else

          { + retVal = clientService.toBytes(Long.parseLong(input.toString())); + }

          + } else if (input instanceof Boolean)

          { + retVal = clientService.toBytes((Boolean)input); + }

          else if (input instanceof String)

          { + retVal = clientService.toBytes((String)input); + }

          else

          { + throw new RuntimeException(String.format("Could not identify type for field %", field)); + }

          +
          + return retVal;
          + }
          +
          + protected byte[] asBytes(String field, RecordFieldType fieldType, Record record, boolean asString) {
          +
          + byte[] retVal;
          +
          + if (asString)

          { + retVal = clientService.toBytes(record.getAsString(field)); + }

          else {
          + switch (fieldType)

          { + case BOOLEAN: + retVal = clientService.toBytes(record.getAsBoolean(field)); + break; + case CHAR: + retVal = clientService.toBytes(record.getAsString(field)); + break; + case DOUBLE: + retVal = clientService.toBytes(record.getAsDouble(field)); + break; + case FLOAT: + retVal = clientService.toBytes(record.getAsFloat(field)); + break; + case INT: + retVal = clientService.toBytes(record.getAsInt(field)); + break; + case LONG: + retVal = clientService.toBytes(record.getAsLong(field)); + break; + default: + retVal = clientService.toBytes(record.getAsString(field)); + }

          + }
          +
          + return retVal;
          + }
          +
          + protected PutFlowFile createPut(ProcessContext context, Record record, RecordSchema schema, FlowFile flowFile, String rowFieldName, String columnFamily, String fieldEncodingStrategy) {
          + PutFlowFile retVal = null;
          + final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
          +
          + boolean asString = STRING_ENCODING_VALUE.equals(fieldEncodingStrategy);
          +
          + final byte[] fam = clientService.toBytes(columnFamily);
          +
          + try {
          + if (record != null) {
          + List<PutColumn> columns = new ArrayList<>();
          + for (String name : schema.getFieldNames()) {
          + if (name.equals(rowFieldName))

          { + continue; + }

          + columns.add(new PutColumn(fam, clientService.toBytes(name), asBytes(name, schema.getField(name).get().getDataType().getFieldType(), record, asString)));
          + }
          + retVal = new PutFlowFile(tableName, clientService.toBytes(record.getAsString(rowFieldName)), columns, flowFile);
          — End diff –

          We should pass down the "Row ID Encoding Strategy" and then do something like...

          ```
          String rowIdValue = record.getAsString(rowFieldName);
          bytes[] rowId = getRow(rowIdValue, rowEncoding)
          ```
          We should also do something to handle the case where the specified rowFieldName does not exist in the schema, or it does but that field happens to be null in the given record.

          Currently either of those will cause an exception and route the flow file to failure, but the error that is produced is just a NullPointerException and hard for the user in the UI to know what happened.

          If `record.getAsString(rowFieldName)` returns null then we can throw an exception out of this method with a nicer message like "Unable to get row id from <field name>" or something,

          Show
          githubbot ASF GitHub Bot added a comment - Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/1961#discussion_r126218471 — Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java — @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.hbase.put.PutColumn; +import org.apache.nifi.hbase.put.PutFlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +@EventDriven +@SupportsBatching +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags( {"hadoop", "hbase", "put", "record"} ) +@CapabilityDescription("Adds rows to HBase based on the contents of a flowfile using a configured record reader.") +public class PutHBaseRecord extends AbstractPutHBase { + + protected static final PropertyDescriptor ROW_FIELD_NAME = new PropertyDescriptor.Builder() + .name("Row Identifier Field Name") + .description("Specifies the name of a record field whose value should be used as the row id for the given record.") + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + protected static final String FAIL_VALUE = "Fail"; + protected static final String WARN_VALUE = "Warn"; + protected static final String IGNORE_VALUE = "Ignore"; + protected static final String TEXT_VALUE = "Text"; + + protected static final AllowableValue COMPLEX_FIELD_FAIL = new AllowableValue(FAIL_VALUE, FAIL_VALUE, "Route entire FlowFile to failure if any elements contain complex values."); + protected static final AllowableValue COMPLEX_FIELD_WARN = new AllowableValue(WARN_VALUE, WARN_VALUE, "Provide a warning and do not include field in row sent to HBase."); + protected static final AllowableValue COMPLEX_FIELD_IGNORE = new AllowableValue(IGNORE_VALUE, IGNORE_VALUE, "Silently ignore and do not include in row sent to HBase."); + protected static final AllowableValue COMPLEX_FIELD_TEXT = new AllowableValue(TEXT_VALUE, TEXT_VALUE, "Use the string representation of the complex field as the value of the given column."); + + static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder() + .name("record-reader") + .displayName("Record Reader") + .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema") + .identifiesControllerService(RecordReaderFactory.class) + .required(true) + .build(); + + protected static final PropertyDescriptor COMPLEX_FIELD_STRATEGY = new PropertyDescriptor.Builder() + .name("Complex Field Strategy") + .description("Indicates how to handle complex fields, i.e. fields that do not have a single text value.") + .expressionLanguageSupported(false) + .required(true) + .allowableValues(COMPLEX_FIELD_FAIL, COMPLEX_FIELD_WARN, COMPLEX_FIELD_IGNORE, COMPLEX_FIELD_TEXT) + .defaultValue(COMPLEX_FIELD_TEXT.getValue()) + .build(); + + + protected static final AllowableValue FIELD_ENCODING_STRING = new AllowableValue(STRING_ENCODING_VALUE, STRING_ENCODING_VALUE, + "Stores the value of each field as a UTF-8 String."); + protected static final AllowableValue FIELD_ENCODING_BYTES = new AllowableValue(BYTES_ENCODING_VALUE, BYTES_ENCODING_VALUE, + "Stores the value of each field as the byte representation of the type derived from the record."); + + protected static final PropertyDescriptor FIELD_ENCODING_STRATEGY = new PropertyDescriptor.Builder() + .name("Field Encoding Strategy") + .description(("Indicates how to store the value of each field in HBase. The default behavior is to convert each value from the " + + "record to a String, and store the UTF-8 bytes. Choosing Bytes will interpret the type of each field from " + + "the record, and convert the value to the byte representation of that type, meaning an integer will be stored as the " + + "byte representation of that integer.")) + .required(true) + .allowableValues(FIELD_ENCODING_STRING, FIELD_ENCODING_BYTES) + .defaultValue(FIELD_ENCODING_STRING.getValue()) + .build(); + + protected static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .name("Batch Size") + .description("The maximum number of records to be sent to HBase at any one time from the record set.") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("1000") + .build(); + + @Override + public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(RECORD_READER_FACTORY); + properties.add(HBASE_CLIENT_SERVICE); + properties.add(TABLE_NAME); + properties.add(ROW_ID); + properties.add(ROW_FIELD_NAME); + properties.add(ROW_ID_ENCODING_STRATEGY); + properties.add(COLUMN_FAMILY); + properties.add(BATCH_SIZE); + properties.add(COMPLEX_FIELD_STRATEGY); + properties.add(FIELD_ENCODING_STRATEGY); + return properties; + } + + @Override + public Set<Relationship> getRelationships() { + final Set<Relationship> rels = new HashSet<>(); + rels.add(REL_SUCCESS); + rels.add(REL_FAILURE); + return rels; + } + + private int addBatch(String tableName, List<PutFlowFile> flowFiles) throws IOException { + int columns = 0; + clientService.put(tableName, flowFiles); + for (PutFlowFile put : flowFiles) { + columns += put.getColumns().size(); + } + + return columns; + } + + private RecordReaderFactory recordParserFactory; + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER_FACTORY) + .asControllerService(RecordReaderFactory.class); + List<PutFlowFile> flowFiles = new ArrayList<>(); + final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); + final String rowFieldName = context.getProperty(ROW_FIELD_NAME).evaluateAttributeExpressions(flowFile).getValue(); + final String columnFamily = context.getProperty(COLUMN_FAMILY).evaluateAttributeExpressions(flowFile).getValue(); + final String fieldEncodingStrategy = context.getProperty(FIELD_ENCODING_STRATEGY).getValue(); + final long start = System.nanoTime(); + int index = 0; + int columns = 0; + boolean failed = false; + String startIndexStr = flowFile.getAttribute("restart.index"); + int startIndex = -1; + if (startIndexStr != null) { + startIndex = Integer.parseInt(startIndexStr); + } + + PutFlowFile first = null; + PutFlowFile last = null; + try (RecordReader reader = recordParserFactory.createRecordReader(flowFile, session.read(flowFile), getLogger())) { + Record record; + if (startIndex >= 0) { + while ( index++ < startIndex && (reader.nextRecord()) != null) {} + } + + while ((record = reader.nextRecord()) != null) { + PutFlowFile putFlowFile = createPut(context, record, reader.getSchema(), flowFile, rowFieldName, columnFamily, fieldEncodingStrategy); + flowFiles.add(putFlowFile); + if (index == 0) { + first = putFlowFile; + } + index++; + + if (flowFiles.size() == batchSize) { + columns += addBatch(tableName, flowFiles); + last = flowFiles.get(flowFiles.size() - 1); + flowFiles = new ArrayList<>(); + } + } + if (flowFiles.size() > 0) { + columns += addBatch(tableName, flowFiles); + last = flowFiles.get(flowFiles.size() - 1); + } + } catch (Exception ex) { + getLogger().error("Failed to put records to HBase.", ex); + failed = true; + } + + if (!failed) { + long sendMillis = System.nanoTime() - start; + List<String> urls = Arrays.asList(getTransitUri(first), getTransitUri(last)); + final String details = String.format("Put %d cells to HBase.", columns); + session.getProvenanceReporter().send(flowFile, urls.get(0), details, sendMillis); + session.getProvenanceReporter().send(flowFile, urls.get(1), details, sendMillis); + session.transfer(flowFile, REL_SUCCESS); + } else { + String restartIndex = Integer.toString(index - flowFiles.size()); + flowFile = session.putAttribute(flowFile, "restart.index", restartIndex); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + } + + + + session.commit(); + } + + + @Override + protected PutFlowFile createPut(ProcessSession session, ProcessContext context, FlowFile flowFile) { + return null; + } + + protected byte[] asBytes(String field, Object input) { + byte[] retVal = null; + + if (input instanceof Number) { + if (input instanceof Float || input instanceof Double) { + retVal = clientService.toBytes(Double.parseDouble(input.toString())); + } else { + retVal = clientService.toBytes(Long.parseLong(input.toString())); + } + } else if (input instanceof Boolean) { + retVal = clientService.toBytes((Boolean)input); + } else if (input instanceof String) { + retVal = clientService.toBytes((String)input); + } else { + throw new RuntimeException(String.format("Could not identify type for field %", field)); + } + + return retVal; + } + + protected byte[] asBytes(String field, RecordFieldType fieldType, Record record, boolean asString) { + + byte[] retVal; + + if (asString) { + retVal = clientService.toBytes(record.getAsString(field)); + } else { + switch (fieldType) { + case BOOLEAN: + retVal = clientService.toBytes(record.getAsBoolean(field)); + break; + case CHAR: + retVal = clientService.toBytes(record.getAsString(field)); + break; + case DOUBLE: + retVal = clientService.toBytes(record.getAsDouble(field)); + break; + case FLOAT: + retVal = clientService.toBytes(record.getAsFloat(field)); + break; + case INT: + retVal = clientService.toBytes(record.getAsInt(field)); + break; + case LONG: + retVal = clientService.toBytes(record.getAsLong(field)); + break; + default: + retVal = clientService.toBytes(record.getAsString(field)); + } + } + + return retVal; + } + + protected PutFlowFile createPut(ProcessContext context, Record record, RecordSchema schema, FlowFile flowFile, String rowFieldName, String columnFamily, String fieldEncodingStrategy) { + PutFlowFile retVal = null; + final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); + + boolean asString = STRING_ENCODING_VALUE.equals(fieldEncodingStrategy); + + final byte[] fam = clientService.toBytes(columnFamily); + + try { + if (record != null) { + List<PutColumn> columns = new ArrayList<>(); + for (String name : schema.getFieldNames()) { + if (name.equals(rowFieldName)) { + continue; + } + columns.add(new PutColumn(fam, clientService.toBytes(name), asBytes(name, schema.getField(name).get().getDataType().getFieldType(), record, asString))); + } + retVal = new PutFlowFile(tableName, clientService.toBytes(record.getAsString(rowFieldName)), columns, flowFile); — End diff – We should pass down the "Row ID Encoding Strategy" and then do something like... ``` String rowIdValue = record.getAsString(rowFieldName); bytes[] rowId = getRow(rowIdValue, rowEncoding) ``` We should also do something to handle the case where the specified rowFieldName does not exist in the schema, or it does but that field happens to be null in the given record. Currently either of those will cause an exception and route the flow file to failure, but the error that is produced is just a NullPointerException and hard for the user in the UI to know what happened. If `record.getAsString(rowFieldName)` returns null then we can throw an exception out of this method with a nicer message like "Unable to get row id from <field name>" or something,
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user bbende commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1961#discussion_r126219285

          — Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java —
          @@ -0,0 +1,309 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.hbase;
          +
          +import org.apache.nifi.annotation.behavior.EventDriven;
          +import org.apache.nifi.annotation.behavior.InputRequirement;
          +import org.apache.nifi.annotation.behavior.SupportsBatching;
          +import org.apache.nifi.annotation.documentation.CapabilityDescription;
          +import org.apache.nifi.annotation.documentation.Tags;
          +import org.apache.nifi.components.AllowableValue;
          +import org.apache.nifi.components.PropertyDescriptor;
          +import org.apache.nifi.flowfile.FlowFile;
          +import org.apache.nifi.hbase.put.PutColumn;
          +import org.apache.nifi.hbase.put.PutFlowFile;
          +import org.apache.nifi.processor.ProcessContext;
          +import org.apache.nifi.processor.ProcessSession;
          +import org.apache.nifi.processor.Relationship;
          +import org.apache.nifi.processor.exception.ProcessException;
          +import org.apache.nifi.processor.util.StandardValidators;
          +import org.apache.nifi.serialization.RecordReader;
          +import org.apache.nifi.serialization.RecordReaderFactory;
          +import org.apache.nifi.serialization.record.Record;
          +import org.apache.nifi.serialization.record.RecordFieldType;
          +import org.apache.nifi.serialization.record.RecordSchema;
          +
          +import java.io.IOException;
          +import java.util.ArrayList;
          +import java.util.Arrays;
          +import java.util.HashSet;
          +import java.util.List;
          +import java.util.Set;
          +
          +@EventDriven
          +@SupportsBatching
          +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
          +@Tags(

          {"hadoop", "hbase", "put", "record"}

          )
          +@CapabilityDescription("Adds rows to HBase based on the contents of a flowfile using a configured record reader.")
          +public class PutHBaseRecord extends AbstractPutHBase {
          +
          + protected static final PropertyDescriptor ROW_FIELD_NAME = new PropertyDescriptor.Builder()
          + .name("Row Identifier Field Name")
          + .description("Specifies the name of a record field whose value should be used as the row id for the given record.")
          + .expressionLanguageSupported(true)
          + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
          + .build();
          +
          + protected static final String FAIL_VALUE = "Fail";
          + protected static final String WARN_VALUE = "Warn";
          + protected static final String IGNORE_VALUE = "Ignore";
          + protected static final String TEXT_VALUE = "Text";
          +
          + protected static final AllowableValue COMPLEX_FIELD_FAIL = new AllowableValue(FAIL_VALUE, FAIL_VALUE, "Route entire FlowFile to failure if any elements contain complex values.");
          + protected static final AllowableValue COMPLEX_FIELD_WARN = new AllowableValue(WARN_VALUE, WARN_VALUE, "Provide a warning and do not include field in row sent to HBase.");
          + protected static final AllowableValue COMPLEX_FIELD_IGNORE = new AllowableValue(IGNORE_VALUE, IGNORE_VALUE, "Silently ignore and do not include in row sent to HBase.");
          + protected static final AllowableValue COMPLEX_FIELD_TEXT = new AllowableValue(TEXT_VALUE, TEXT_VALUE, "Use the string representation of the complex field as the value of the given column.");
          +
          + static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder()
          + .name("record-reader")
          + .displayName("Record Reader")
          + .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema")
          + .identifiesControllerService(RecordReaderFactory.class)
          + .required(true)
          + .build();
          +
          + protected static final PropertyDescriptor COMPLEX_FIELD_STRATEGY = new PropertyDescriptor.Builder()
          + .name("Complex Field Strategy")
          + .description("Indicates how to handle complex fields, i.e. fields that do not have a single text value.")
          + .expressionLanguageSupported(false)
          + .required(true)
          + .allowableValues(COMPLEX_FIELD_FAIL, COMPLEX_FIELD_WARN, COMPLEX_FIELD_IGNORE, COMPLEX_FIELD_TEXT)
          + .defaultValue(COMPLEX_FIELD_TEXT.getValue())
          + .build();
          +
          +
          + protected static final AllowableValue FIELD_ENCODING_STRING = new AllowableValue(STRING_ENCODING_VALUE, STRING_ENCODING_VALUE,
          + "Stores the value of each field as a UTF-8 String.");
          + protected static final AllowableValue FIELD_ENCODING_BYTES = new AllowableValue(BYTES_ENCODING_VALUE, BYTES_ENCODING_VALUE,
          + "Stores the value of each field as the byte representation of the type derived from the record.");
          +
          + protected static final PropertyDescriptor FIELD_ENCODING_STRATEGY = new PropertyDescriptor.Builder()
          + .name("Field Encoding Strategy")
          + .description(("Indicates how to store the value of each field in HBase. The default behavior is to convert each value from the " +
          + "record to a String, and store the UTF-8 bytes. Choosing Bytes will interpret the type of each field from " +
          + "the record, and convert the value to the byte representation of that type, meaning an integer will be stored as the " +
          + "byte representation of that integer."))
          + .required(true)
          + .allowableValues(FIELD_ENCODING_STRING, FIELD_ENCODING_BYTES)
          + .defaultValue(FIELD_ENCODING_STRING.getValue())
          + .build();
          +
          + protected static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
          + .name("Batch Size")
          + .description("The maximum number of records to be sent to HBase at any one time from the record set.")
          + .required(true)
          + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
          + .defaultValue("1000")
          + .build();
          +
          + @Override
          + public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
          + final List<PropertyDescriptor> properties = new ArrayList<>();
          + properties.add(RECORD_READER_FACTORY);
          + properties.add(HBASE_CLIENT_SERVICE);
          + properties.add(TABLE_NAME);
          + properties.add(ROW_ID);
          — End diff –

          I think we can remove ROW_ID here since we are only using ROW_FIELD_NAME to get the row id from a record.

          Show
          githubbot ASF GitHub Bot added a comment - Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/1961#discussion_r126219285 — Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java — @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.hbase.put.PutColumn; +import org.apache.nifi.hbase.put.PutFlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +@EventDriven +@SupportsBatching +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags( {"hadoop", "hbase", "put", "record"} ) +@CapabilityDescription("Adds rows to HBase based on the contents of a flowfile using a configured record reader.") +public class PutHBaseRecord extends AbstractPutHBase { + + protected static final PropertyDescriptor ROW_FIELD_NAME = new PropertyDescriptor.Builder() + .name("Row Identifier Field Name") + .description("Specifies the name of a record field whose value should be used as the row id for the given record.") + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + protected static final String FAIL_VALUE = "Fail"; + protected static final String WARN_VALUE = "Warn"; + protected static final String IGNORE_VALUE = "Ignore"; + protected static final String TEXT_VALUE = "Text"; + + protected static final AllowableValue COMPLEX_FIELD_FAIL = new AllowableValue(FAIL_VALUE, FAIL_VALUE, "Route entire FlowFile to failure if any elements contain complex values."); + protected static final AllowableValue COMPLEX_FIELD_WARN = new AllowableValue(WARN_VALUE, WARN_VALUE, "Provide a warning and do not include field in row sent to HBase."); + protected static final AllowableValue COMPLEX_FIELD_IGNORE = new AllowableValue(IGNORE_VALUE, IGNORE_VALUE, "Silently ignore and do not include in row sent to HBase."); + protected static final AllowableValue COMPLEX_FIELD_TEXT = new AllowableValue(TEXT_VALUE, TEXT_VALUE, "Use the string representation of the complex field as the value of the given column."); + + static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder() + .name("record-reader") + .displayName("Record Reader") + .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema") + .identifiesControllerService(RecordReaderFactory.class) + .required(true) + .build(); + + protected static final PropertyDescriptor COMPLEX_FIELD_STRATEGY = new PropertyDescriptor.Builder() + .name("Complex Field Strategy") + .description("Indicates how to handle complex fields, i.e. fields that do not have a single text value.") + .expressionLanguageSupported(false) + .required(true) + .allowableValues(COMPLEX_FIELD_FAIL, COMPLEX_FIELD_WARN, COMPLEX_FIELD_IGNORE, COMPLEX_FIELD_TEXT) + .defaultValue(COMPLEX_FIELD_TEXT.getValue()) + .build(); + + + protected static final AllowableValue FIELD_ENCODING_STRING = new AllowableValue(STRING_ENCODING_VALUE, STRING_ENCODING_VALUE, + "Stores the value of each field as a UTF-8 String."); + protected static final AllowableValue FIELD_ENCODING_BYTES = new AllowableValue(BYTES_ENCODING_VALUE, BYTES_ENCODING_VALUE, + "Stores the value of each field as the byte representation of the type derived from the record."); + + protected static final PropertyDescriptor FIELD_ENCODING_STRATEGY = new PropertyDescriptor.Builder() + .name("Field Encoding Strategy") + .description(("Indicates how to store the value of each field in HBase. The default behavior is to convert each value from the " + + "record to a String, and store the UTF-8 bytes. Choosing Bytes will interpret the type of each field from " + + "the record, and convert the value to the byte representation of that type, meaning an integer will be stored as the " + + "byte representation of that integer.")) + .required(true) + .allowableValues(FIELD_ENCODING_STRING, FIELD_ENCODING_BYTES) + .defaultValue(FIELD_ENCODING_STRING.getValue()) + .build(); + + protected static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .name("Batch Size") + .description("The maximum number of records to be sent to HBase at any one time from the record set.") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("1000") + .build(); + + @Override + public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(RECORD_READER_FACTORY); + properties.add(HBASE_CLIENT_SERVICE); + properties.add(TABLE_NAME); + properties.add(ROW_ID); — End diff – I think we can remove ROW_ID here since we are only using ROW_FIELD_NAME to get the row id from a record.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user bbende commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1961#discussion_r126223383

          — Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java —
          @@ -0,0 +1,309 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.hbase;
          +
          +import org.apache.nifi.annotation.behavior.EventDriven;
          +import org.apache.nifi.annotation.behavior.InputRequirement;
          +import org.apache.nifi.annotation.behavior.SupportsBatching;
          +import org.apache.nifi.annotation.documentation.CapabilityDescription;
          +import org.apache.nifi.annotation.documentation.Tags;
          +import org.apache.nifi.components.AllowableValue;
          +import org.apache.nifi.components.PropertyDescriptor;
          +import org.apache.nifi.flowfile.FlowFile;
          +import org.apache.nifi.hbase.put.PutColumn;
          +import org.apache.nifi.hbase.put.PutFlowFile;
          +import org.apache.nifi.processor.ProcessContext;
          +import org.apache.nifi.processor.ProcessSession;
          +import org.apache.nifi.processor.Relationship;
          +import org.apache.nifi.processor.exception.ProcessException;
          +import org.apache.nifi.processor.util.StandardValidators;
          +import org.apache.nifi.serialization.RecordReader;
          +import org.apache.nifi.serialization.RecordReaderFactory;
          +import org.apache.nifi.serialization.record.Record;
          +import org.apache.nifi.serialization.record.RecordFieldType;
          +import org.apache.nifi.serialization.record.RecordSchema;
          +
          +import java.io.IOException;
          +import java.util.ArrayList;
          +import java.util.Arrays;
          +import java.util.HashSet;
          +import java.util.List;
          +import java.util.Set;
          +
          +@EventDriven
          +@SupportsBatching
          +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
          +@Tags(

          {"hadoop", "hbase", "put", "record"}

          )
          +@CapabilityDescription("Adds rows to HBase based on the contents of a flowfile using a configured record reader.")
          +public class PutHBaseRecord extends AbstractPutHBase {
          +
          + protected static final PropertyDescriptor ROW_FIELD_NAME = new PropertyDescriptor.Builder()
          + .name("Row Identifier Field Name")
          — End diff –

          I'm ok with leaving this as is, but just wanted to mention that we could make this "Row Id Record Path" which would take a record path that would be evaluated to obtain the row id value. This way it can get a row id from something more complex then a top-level field.

          I think the current functionality supports most use-cases though.

          Show
          githubbot ASF GitHub Bot added a comment - Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/1961#discussion_r126223383 — Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java — @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.hbase.put.PutColumn; +import org.apache.nifi.hbase.put.PutFlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +@EventDriven +@SupportsBatching +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags( {"hadoop", "hbase", "put", "record"} ) +@CapabilityDescription("Adds rows to HBase based on the contents of a flowfile using a configured record reader.") +public class PutHBaseRecord extends AbstractPutHBase { + + protected static final PropertyDescriptor ROW_FIELD_NAME = new PropertyDescriptor.Builder() + .name("Row Identifier Field Name") — End diff – I'm ok with leaving this as is, but just wanted to mention that we could make this "Row Id Record Path" which would take a record path that would be evaluated to obtain the row id value. This way it can get a row id from something more complex then a top-level field. I think the current functionality supports most use-cases though.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user bbende commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1961#discussion_r126216557

          — Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java —
          @@ -0,0 +1,309 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.hbase;
          +
          +import org.apache.nifi.annotation.behavior.EventDriven;
          +import org.apache.nifi.annotation.behavior.InputRequirement;
          +import org.apache.nifi.annotation.behavior.SupportsBatching;
          +import org.apache.nifi.annotation.documentation.CapabilityDescription;
          +import org.apache.nifi.annotation.documentation.Tags;
          +import org.apache.nifi.components.AllowableValue;
          +import org.apache.nifi.components.PropertyDescriptor;
          +import org.apache.nifi.flowfile.FlowFile;
          +import org.apache.nifi.hbase.put.PutColumn;
          +import org.apache.nifi.hbase.put.PutFlowFile;
          +import org.apache.nifi.processor.ProcessContext;
          +import org.apache.nifi.processor.ProcessSession;
          +import org.apache.nifi.processor.Relationship;
          +import org.apache.nifi.processor.exception.ProcessException;
          +import org.apache.nifi.processor.util.StandardValidators;
          +import org.apache.nifi.serialization.RecordReader;
          +import org.apache.nifi.serialization.RecordReaderFactory;
          +import org.apache.nifi.serialization.record.Record;
          +import org.apache.nifi.serialization.record.RecordFieldType;
          +import org.apache.nifi.serialization.record.RecordSchema;
          +
          +import java.io.IOException;
          +import java.util.ArrayList;
          +import java.util.Arrays;
          +import java.util.HashSet;
          +import java.util.List;
          +import java.util.Set;
          +
          +@EventDriven
          +@SupportsBatching
          +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
          +@Tags(

          {"hadoop", "hbase", "put", "record"}

          )
          +@CapabilityDescription("Adds rows to HBase based on the contents of a flowfile using a configured record reader.")
          +public class PutHBaseRecord extends AbstractPutHBase {
          +
          + protected static final PropertyDescriptor ROW_FIELD_NAME = new PropertyDescriptor.Builder()
          + .name("Row Identifier Field Name")
          + .description("Specifies the name of a record field whose value should be used as the row id for the given record.")
          + .expressionLanguageSupported(true)
          + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
          + .build();
          +
          + protected static final String FAIL_VALUE = "Fail";
          + protected static final String WARN_VALUE = "Warn";
          + protected static final String IGNORE_VALUE = "Ignore";
          + protected static final String TEXT_VALUE = "Text";
          +
          + protected static final AllowableValue COMPLEX_FIELD_FAIL = new AllowableValue(FAIL_VALUE, FAIL_VALUE, "Route entire FlowFile to failure if any elements contain complex values.");
          + protected static final AllowableValue COMPLEX_FIELD_WARN = new AllowableValue(WARN_VALUE, WARN_VALUE, "Provide a warning and do not include field in row sent to HBase.");
          + protected static final AllowableValue COMPLEX_FIELD_IGNORE = new AllowableValue(IGNORE_VALUE, IGNORE_VALUE, "Silently ignore and do not include in row sent to HBase.");
          + protected static final AllowableValue COMPLEX_FIELD_TEXT = new AllowableValue(TEXT_VALUE, TEXT_VALUE, "Use the string representation of the complex field as the value of the given column.");
          +
          + static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder()
          + .name("record-reader")
          + .displayName("Record Reader")
          + .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema")
          + .identifiesControllerService(RecordReaderFactory.class)
          + .required(true)
          + .build();
          +
          + protected static final PropertyDescriptor COMPLEX_FIELD_STRATEGY = new PropertyDescriptor.Builder()
          + .name("Complex Field Strategy")
          + .description("Indicates how to handle complex fields, i.e. fields that do not have a single text value.")
          + .expressionLanguageSupported(false)
          + .required(true)
          + .allowableValues(COMPLEX_FIELD_FAIL, COMPLEX_FIELD_WARN, COMPLEX_FIELD_IGNORE, COMPLEX_FIELD_TEXT)
          + .defaultValue(COMPLEX_FIELD_TEXT.getValue())
          + .build();
          +
          +
          + protected static final AllowableValue FIELD_ENCODING_STRING = new AllowableValue(STRING_ENCODING_VALUE, STRING_ENCODING_VALUE,
          + "Stores the value of each field as a UTF-8 String.");
          + protected static final AllowableValue FIELD_ENCODING_BYTES = new AllowableValue(BYTES_ENCODING_VALUE, BYTES_ENCODING_VALUE,
          + "Stores the value of each field as the byte representation of the type derived from the record.");
          +
          + protected static final PropertyDescriptor FIELD_ENCODING_STRATEGY = new PropertyDescriptor.Builder()
          + .name("Field Encoding Strategy")
          + .description(("Indicates how to store the value of each field in HBase. The default behavior is to convert each value from the " +
          + "record to a String, and store the UTF-8 bytes. Choosing Bytes will interpret the type of each field from " +
          + "the record, and convert the value to the byte representation of that type, meaning an integer will be stored as the " +
          + "byte representation of that integer."))
          + .required(true)
          + .allowableValues(FIELD_ENCODING_STRING, FIELD_ENCODING_BYTES)
          + .defaultValue(FIELD_ENCODING_STRING.getValue())
          + .build();
          +
          + protected static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
          + .name("Batch Size")
          + .description("The maximum number of records to be sent to HBase at any one time from the record set.")
          + .required(true)
          + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
          + .defaultValue("1000")
          + .build();
          +
          + @Override
          + public final List<PropertyDescriptor> getSupportedPropertyDescriptors()

          { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(RECORD_READER_FACTORY); + properties.add(HBASE_CLIENT_SERVICE); + properties.add(TABLE_NAME); + properties.add(ROW_ID); + properties.add(ROW_FIELD_NAME); + properties.add(ROW_ID_ENCODING_STRATEGY); + properties.add(COLUMN_FAMILY); + properties.add(BATCH_SIZE); + properties.add(COMPLEX_FIELD_STRATEGY); + properties.add(FIELD_ENCODING_STRATEGY); + return properties; + }

          +
          + @Override
          + public Set<Relationship> getRelationships()

          { + final Set<Relationship> rels = new HashSet<>(); + rels.add(REL_SUCCESS); + rels.add(REL_FAILURE); + return rels; + }

          +
          + private int addBatch(String tableName, List<PutFlowFile> flowFiles) throws IOException {
          + int columns = 0;
          + clientService.put(tableName, flowFiles);
          + for (PutFlowFile put : flowFiles)

          { + columns += put.getColumns().size(); + }

          +
          + return columns;
          + }
          +
          + private RecordReaderFactory recordParserFactory;
          + @Override
          + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
          + final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
          + FlowFile flowFile = session.get();
          + if (flowFile == null)

          { + return; + }

          +
          + final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER_FACTORY)
          + .asControllerService(RecordReaderFactory.class);
          + List<PutFlowFile> flowFiles = new ArrayList<>();
          + final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
          + final String rowFieldName = context.getProperty(ROW_FIELD_NAME).evaluateAttributeExpressions(flowFile).getValue();
          + final String columnFamily = context.getProperty(COLUMN_FAMILY).evaluateAttributeExpressions(flowFile).getValue();
          + final String fieldEncodingStrategy = context.getProperty(FIELD_ENCODING_STRATEGY).getValue();
          + final long start = System.nanoTime();
          + int index = 0;
          + int columns = 0;
          + boolean failed = false;
          + String startIndexStr = flowFile.getAttribute("restart.index");
          + int startIndex = -1;
          + if (startIndexStr != null)

          { + startIndex = Integer.parseInt(startIndexStr); + }

          +
          + PutFlowFile first = null;
          + PutFlowFile last = null;
          + try (RecordReader reader = recordParserFactory.createRecordReader(flowFile, session.read(flowFile), getLogger())) {
          + Record record;
          + if (startIndex >= 0) {
          + while ( index++ < startIndex && (reader.nextRecord()) != null) {}
          + }
          +
          + while ((record = reader.nextRecord()) != null) {
          + PutFlowFile putFlowFile = createPut(context, record, reader.getSchema(), flowFile, rowFieldName, columnFamily, fieldEncodingStrategy);
          + flowFiles.add(putFlowFile);
          + if (index == 0)

          { + first = putFlowFile; + }

          + index++;
          +
          + if (flowFiles.size() == batchSize)

          { + columns += addBatch(tableName, flowFiles); + last = flowFiles.get(flowFiles.size() - 1); + flowFiles = new ArrayList<>(); + }

          + }
          + if (flowFiles.size() > 0)

          { + columns += addBatch(tableName, flowFiles); + last = flowFiles.get(flowFiles.size() - 1); + }

          + } catch (Exception ex)

          { + getLogger().error("Failed to put records to HBase.", ex); + failed = true; + }

          +
          + if (!failed)

          { + long sendMillis = System.nanoTime() - start; + List<String> urls = Arrays.asList(getTransitUri(first), getTransitUri(last)); + final String details = String.format("Put %d cells to HBase.", columns); + session.getProvenanceReporter().send(flowFile, urls.get(0), details, sendMillis); + session.getProvenanceReporter().send(flowFile, urls.get(1), details, sendMillis); + session.transfer(flowFile, REL_SUCCESS); + }

          else

          { + String restartIndex = Integer.toString(index - flowFiles.size()); + flowFile = session.putAttribute(flowFile, "restart.index", restartIndex); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + }

          +
          +
          +
          + session.commit();
          + }
          +
          +
          + @Override
          + protected PutFlowFile createPut(ProcessSession session, ProcessContext context, FlowFile flowFile)

          { + return null; + }

          +
          + protected byte[] asBytes(String field, Object input) {
          + byte[] retVal = null;
          +
          + if (input instanceof Number) {
          + if (input instanceof Float || input instanceof Double)

          { + retVal = clientService.toBytes(Double.parseDouble(input.toString())); + }

          else

          { + retVal = clientService.toBytes(Long.parseLong(input.toString())); + }

          + } else if (input instanceof Boolean)

          { + retVal = clientService.toBytes((Boolean)input); + }

          else if (input instanceof String)

          { + retVal = clientService.toBytes((String)input); + }

          else

          { + throw new RuntimeException(String.format("Could not identify type for field %", field)); + }

          +
          + return retVal;
          + }
          +
          + protected byte[] asBytes(String field, RecordFieldType fieldType, Record record, boolean asString) {
          +
          + byte[] retVal;
          +
          + if (asString)

          { + retVal = clientService.toBytes(record.getAsString(field)); + }

          else {
          + switch (fieldType) {
          + case BOOLEAN:
          + retVal = clientService.toBytes(record.getAsBoolean(field));
          + break;
          + case CHAR:
          + retVal = clientService.toBytes(record.getAsString(field));
          + break;
          + case DOUBLE:
          + retVal = clientService.toBytes(record.getAsDouble(field));
          + break;
          + case FLOAT:
          + retVal = clientService.toBytes(record.getAsFloat(field));
          + break;
          + case INT:
          + retVal = clientService.toBytes(record.getAsInt(field));
          + break;
          + case LONG:
          + retVal = clientService.toBytes(record.getAsLong(field));
          + break;
          + default:
          + retVal = clientService.toBytes(record.getAsString(field));
          — End diff –

          It seems like we should be using the "Complex Field Strategy" here... basically if its not one of the flat value types then determine what to do with it based on the strategy, similar to this:

          https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java#L215-L231

          Alternatively, I suppose we could just remove the strategy property and document the behavior, as long as its very clear what will happen to complex fields.

          Show
          githubbot ASF GitHub Bot added a comment - Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/1961#discussion_r126216557 — Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java — @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.hbase.put.PutColumn; +import org.apache.nifi.hbase.put.PutFlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +@EventDriven +@SupportsBatching +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags( {"hadoop", "hbase", "put", "record"} ) +@CapabilityDescription("Adds rows to HBase based on the contents of a flowfile using a configured record reader.") +public class PutHBaseRecord extends AbstractPutHBase { + + protected static final PropertyDescriptor ROW_FIELD_NAME = new PropertyDescriptor.Builder() + .name("Row Identifier Field Name") + .description("Specifies the name of a record field whose value should be used as the row id for the given record.") + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + protected static final String FAIL_VALUE = "Fail"; + protected static final String WARN_VALUE = "Warn"; + protected static final String IGNORE_VALUE = "Ignore"; + protected static final String TEXT_VALUE = "Text"; + + protected static final AllowableValue COMPLEX_FIELD_FAIL = new AllowableValue(FAIL_VALUE, FAIL_VALUE, "Route entire FlowFile to failure if any elements contain complex values."); + protected static final AllowableValue COMPLEX_FIELD_WARN = new AllowableValue(WARN_VALUE, WARN_VALUE, "Provide a warning and do not include field in row sent to HBase."); + protected static final AllowableValue COMPLEX_FIELD_IGNORE = new AllowableValue(IGNORE_VALUE, IGNORE_VALUE, "Silently ignore and do not include in row sent to HBase."); + protected static final AllowableValue COMPLEX_FIELD_TEXT = new AllowableValue(TEXT_VALUE, TEXT_VALUE, "Use the string representation of the complex field as the value of the given column."); + + static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder() + .name("record-reader") + .displayName("Record Reader") + .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema") + .identifiesControllerService(RecordReaderFactory.class) + .required(true) + .build(); + + protected static final PropertyDescriptor COMPLEX_FIELD_STRATEGY = new PropertyDescriptor.Builder() + .name("Complex Field Strategy") + .description("Indicates how to handle complex fields, i.e. fields that do not have a single text value.") + .expressionLanguageSupported(false) + .required(true) + .allowableValues(COMPLEX_FIELD_FAIL, COMPLEX_FIELD_WARN, COMPLEX_FIELD_IGNORE, COMPLEX_FIELD_TEXT) + .defaultValue(COMPLEX_FIELD_TEXT.getValue()) + .build(); + + + protected static final AllowableValue FIELD_ENCODING_STRING = new AllowableValue(STRING_ENCODING_VALUE, STRING_ENCODING_VALUE, + "Stores the value of each field as a UTF-8 String."); + protected static final AllowableValue FIELD_ENCODING_BYTES = new AllowableValue(BYTES_ENCODING_VALUE, BYTES_ENCODING_VALUE, + "Stores the value of each field as the byte representation of the type derived from the record."); + + protected static final PropertyDescriptor FIELD_ENCODING_STRATEGY = new PropertyDescriptor.Builder() + .name("Field Encoding Strategy") + .description(("Indicates how to store the value of each field in HBase. The default behavior is to convert each value from the " + + "record to a String, and store the UTF-8 bytes. Choosing Bytes will interpret the type of each field from " + + "the record, and convert the value to the byte representation of that type, meaning an integer will be stored as the " + + "byte representation of that integer.")) + .required(true) + .allowableValues(FIELD_ENCODING_STRING, FIELD_ENCODING_BYTES) + .defaultValue(FIELD_ENCODING_STRING.getValue()) + .build(); + + protected static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .name("Batch Size") + .description("The maximum number of records to be sent to HBase at any one time from the record set.") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("1000") + .build(); + + @Override + public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(RECORD_READER_FACTORY); + properties.add(HBASE_CLIENT_SERVICE); + properties.add(TABLE_NAME); + properties.add(ROW_ID); + properties.add(ROW_FIELD_NAME); + properties.add(ROW_ID_ENCODING_STRATEGY); + properties.add(COLUMN_FAMILY); + properties.add(BATCH_SIZE); + properties.add(COMPLEX_FIELD_STRATEGY); + properties.add(FIELD_ENCODING_STRATEGY); + return properties; + } + + @Override + public Set<Relationship> getRelationships() { + final Set<Relationship> rels = new HashSet<>(); + rels.add(REL_SUCCESS); + rels.add(REL_FAILURE); + return rels; + } + + private int addBatch(String tableName, List<PutFlowFile> flowFiles) throws IOException { + int columns = 0; + clientService.put(tableName, flowFiles); + for (PutFlowFile put : flowFiles) { + columns += put.getColumns().size(); + } + + return columns; + } + + private RecordReaderFactory recordParserFactory; + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER_FACTORY) + .asControllerService(RecordReaderFactory.class); + List<PutFlowFile> flowFiles = new ArrayList<>(); + final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); + final String rowFieldName = context.getProperty(ROW_FIELD_NAME).evaluateAttributeExpressions(flowFile).getValue(); + final String columnFamily = context.getProperty(COLUMN_FAMILY).evaluateAttributeExpressions(flowFile).getValue(); + final String fieldEncodingStrategy = context.getProperty(FIELD_ENCODING_STRATEGY).getValue(); + final long start = System.nanoTime(); + int index = 0; + int columns = 0; + boolean failed = false; + String startIndexStr = flowFile.getAttribute("restart.index"); + int startIndex = -1; + if (startIndexStr != null) { + startIndex = Integer.parseInt(startIndexStr); + } + + PutFlowFile first = null; + PutFlowFile last = null; + try (RecordReader reader = recordParserFactory.createRecordReader(flowFile, session.read(flowFile), getLogger())) { + Record record; + if (startIndex >= 0) { + while ( index++ < startIndex && (reader.nextRecord()) != null) {} + } + + while ((record = reader.nextRecord()) != null) { + PutFlowFile putFlowFile = createPut(context, record, reader.getSchema(), flowFile, rowFieldName, columnFamily, fieldEncodingStrategy); + flowFiles.add(putFlowFile); + if (index == 0) { + first = putFlowFile; + } + index++; + + if (flowFiles.size() == batchSize) { + columns += addBatch(tableName, flowFiles); + last = flowFiles.get(flowFiles.size() - 1); + flowFiles = new ArrayList<>(); + } + } + if (flowFiles.size() > 0) { + columns += addBatch(tableName, flowFiles); + last = flowFiles.get(flowFiles.size() - 1); + } + } catch (Exception ex) { + getLogger().error("Failed to put records to HBase.", ex); + failed = true; + } + + if (!failed) { + long sendMillis = System.nanoTime() - start; + List<String> urls = Arrays.asList(getTransitUri(first), getTransitUri(last)); + final String details = String.format("Put %d cells to HBase.", columns); + session.getProvenanceReporter().send(flowFile, urls.get(0), details, sendMillis); + session.getProvenanceReporter().send(flowFile, urls.get(1), details, sendMillis); + session.transfer(flowFile, REL_SUCCESS); + } else { + String restartIndex = Integer.toString(index - flowFiles.size()); + flowFile = session.putAttribute(flowFile, "restart.index", restartIndex); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + } + + + + session.commit(); + } + + + @Override + protected PutFlowFile createPut(ProcessSession session, ProcessContext context, FlowFile flowFile) { + return null; + } + + protected byte[] asBytes(String field, Object input) { + byte[] retVal = null; + + if (input instanceof Number) { + if (input instanceof Float || input instanceof Double) { + retVal = clientService.toBytes(Double.parseDouble(input.toString())); + } else { + retVal = clientService.toBytes(Long.parseLong(input.toString())); + } + } else if (input instanceof Boolean) { + retVal = clientService.toBytes((Boolean)input); + } else if (input instanceof String) { + retVal = clientService.toBytes((String)input); + } else { + throw new RuntimeException(String.format("Could not identify type for field %", field)); + } + + return retVal; + } + + protected byte[] asBytes(String field, RecordFieldType fieldType, Record record, boolean asString) { + + byte[] retVal; + + if (asString) { + retVal = clientService.toBytes(record.getAsString(field)); + } else { + switch (fieldType) { + case BOOLEAN: + retVal = clientService.toBytes(record.getAsBoolean(field)); + break; + case CHAR: + retVal = clientService.toBytes(record.getAsString(field)); + break; + case DOUBLE: + retVal = clientService.toBytes(record.getAsDouble(field)); + break; + case FLOAT: + retVal = clientService.toBytes(record.getAsFloat(field)); + break; + case INT: + retVal = clientService.toBytes(record.getAsInt(field)); + break; + case LONG: + retVal = clientService.toBytes(record.getAsLong(field)); + break; + default: + retVal = clientService.toBytes(record.getAsString(field)); — End diff – It seems like we should be using the "Complex Field Strategy" here... basically if its not one of the flat value types then determine what to do with it based on the strategy, similar to this: https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java#L215-L231 Alternatively, I suppose we could just remove the strategy property and document the behavior, as long as its very clear what will happen to complex fields.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user bbende commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1961#discussion_r126217156

          — Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java —
          @@ -0,0 +1,309 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.hbase;
          +
          +import org.apache.nifi.annotation.behavior.EventDriven;
          +import org.apache.nifi.annotation.behavior.InputRequirement;
          +import org.apache.nifi.annotation.behavior.SupportsBatching;
          +import org.apache.nifi.annotation.documentation.CapabilityDescription;
          +import org.apache.nifi.annotation.documentation.Tags;
          +import org.apache.nifi.components.AllowableValue;
          +import org.apache.nifi.components.PropertyDescriptor;
          +import org.apache.nifi.flowfile.FlowFile;
          +import org.apache.nifi.hbase.put.PutColumn;
          +import org.apache.nifi.hbase.put.PutFlowFile;
          +import org.apache.nifi.processor.ProcessContext;
          +import org.apache.nifi.processor.ProcessSession;
          +import org.apache.nifi.processor.Relationship;
          +import org.apache.nifi.processor.exception.ProcessException;
          +import org.apache.nifi.processor.util.StandardValidators;
          +import org.apache.nifi.serialization.RecordReader;
          +import org.apache.nifi.serialization.RecordReaderFactory;
          +import org.apache.nifi.serialization.record.Record;
          +import org.apache.nifi.serialization.record.RecordFieldType;
          +import org.apache.nifi.serialization.record.RecordSchema;
          +
          +import java.io.IOException;
          +import java.util.ArrayList;
          +import java.util.Arrays;
          +import java.util.HashSet;
          +import java.util.List;
          +import java.util.Set;
          +
          +@EventDriven
          +@SupportsBatching
          +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
          +@Tags(

          {"hadoop", "hbase", "put", "record"}

          )
          +@CapabilityDescription("Adds rows to HBase based on the contents of a flowfile using a configured record reader.")
          +public class PutHBaseRecord extends AbstractPutHBase {
          — End diff –

          We should add @ReadsAttribute and @WritesAttribute annotations to document when "retry.index" will be written and when/how it will be read and used.

          Show
          githubbot ASF GitHub Bot added a comment - Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/1961#discussion_r126217156 — Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java — @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.hbase.put.PutColumn; +import org.apache.nifi.hbase.put.PutFlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +@EventDriven +@SupportsBatching +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags( {"hadoop", "hbase", "put", "record"} ) +@CapabilityDescription("Adds rows to HBase based on the contents of a flowfile using a configured record reader.") +public class PutHBaseRecord extends AbstractPutHBase { — End diff – We should add @ReadsAttribute and @WritesAttribute annotations to document when "retry.index" will be written and when/how it will be read and used.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user bbende commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1961#discussion_r126231150

          — Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java —
          @@ -0,0 +1,309 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.hbase;
          +
          +import org.apache.nifi.annotation.behavior.EventDriven;
          +import org.apache.nifi.annotation.behavior.InputRequirement;
          +import org.apache.nifi.annotation.behavior.SupportsBatching;
          +import org.apache.nifi.annotation.documentation.CapabilityDescription;
          +import org.apache.nifi.annotation.documentation.Tags;
          +import org.apache.nifi.components.AllowableValue;
          +import org.apache.nifi.components.PropertyDescriptor;
          +import org.apache.nifi.flowfile.FlowFile;
          +import org.apache.nifi.hbase.put.PutColumn;
          +import org.apache.nifi.hbase.put.PutFlowFile;
          +import org.apache.nifi.processor.ProcessContext;
          +import org.apache.nifi.processor.ProcessSession;
          +import org.apache.nifi.processor.Relationship;
          +import org.apache.nifi.processor.exception.ProcessException;
          +import org.apache.nifi.processor.util.StandardValidators;
          +import org.apache.nifi.serialization.RecordReader;
          +import org.apache.nifi.serialization.RecordReaderFactory;
          +import org.apache.nifi.serialization.record.Record;
          +import org.apache.nifi.serialization.record.RecordFieldType;
          +import org.apache.nifi.serialization.record.RecordSchema;
          +
          +import java.io.IOException;
          +import java.util.ArrayList;
          +import java.util.Arrays;
          +import java.util.HashSet;
          +import java.util.List;
          +import java.util.Set;
          +
          +@EventDriven
          +@SupportsBatching
          +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
          +@Tags(

          {"hadoop", "hbase", "put", "record"}

          )
          +@CapabilityDescription("Adds rows to HBase based on the contents of a flowfile using a configured record reader.")
          +public class PutHBaseRecord extends AbstractPutHBase {
          +
          + protected static final PropertyDescriptor ROW_FIELD_NAME = new PropertyDescriptor.Builder()
          + .name("Row Identifier Field Name")
          + .description("Specifies the name of a record field whose value should be used as the row id for the given record.")
          + .expressionLanguageSupported(true)
          + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
          + .build();
          +
          + protected static final String FAIL_VALUE = "Fail";
          + protected static final String WARN_VALUE = "Warn";
          + protected static final String IGNORE_VALUE = "Ignore";
          + protected static final String TEXT_VALUE = "Text";
          +
          + protected static final AllowableValue COMPLEX_FIELD_FAIL = new AllowableValue(FAIL_VALUE, FAIL_VALUE, "Route entire FlowFile to failure if any elements contain complex values.");
          + protected static final AllowableValue COMPLEX_FIELD_WARN = new AllowableValue(WARN_VALUE, WARN_VALUE, "Provide a warning and do not include field in row sent to HBase.");
          + protected static final AllowableValue COMPLEX_FIELD_IGNORE = new AllowableValue(IGNORE_VALUE, IGNORE_VALUE, "Silently ignore and do not include in row sent to HBase.");
          + protected static final AllowableValue COMPLEX_FIELD_TEXT = new AllowableValue(TEXT_VALUE, TEXT_VALUE, "Use the string representation of the complex field as the value of the given column.");
          +
          + static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder()
          + .name("record-reader")
          + .displayName("Record Reader")
          + .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema")
          + .identifiesControllerService(RecordReaderFactory.class)
          + .required(true)
          + .build();
          +
          + protected static final PropertyDescriptor COMPLEX_FIELD_STRATEGY = new PropertyDescriptor.Builder()
          + .name("Complex Field Strategy")
          + .description("Indicates how to handle complex fields, i.e. fields that do not have a single text value.")
          + .expressionLanguageSupported(false)
          + .required(true)
          + .allowableValues(COMPLEX_FIELD_FAIL, COMPLEX_FIELD_WARN, COMPLEX_FIELD_IGNORE, COMPLEX_FIELD_TEXT)
          + .defaultValue(COMPLEX_FIELD_TEXT.getValue())
          + .build();
          +
          +
          + protected static final AllowableValue FIELD_ENCODING_STRING = new AllowableValue(STRING_ENCODING_VALUE, STRING_ENCODING_VALUE,
          + "Stores the value of each field as a UTF-8 String.");
          + protected static final AllowableValue FIELD_ENCODING_BYTES = new AllowableValue(BYTES_ENCODING_VALUE, BYTES_ENCODING_VALUE,
          + "Stores the value of each field as the byte representation of the type derived from the record.");
          +
          + protected static final PropertyDescriptor FIELD_ENCODING_STRATEGY = new PropertyDescriptor.Builder()
          + .name("Field Encoding Strategy")
          + .description(("Indicates how to store the value of each field in HBase. The default behavior is to convert each value from the " +
          + "record to a String, and store the UTF-8 bytes. Choosing Bytes will interpret the type of each field from " +
          + "the record, and convert the value to the byte representation of that type, meaning an integer will be stored as the " +
          + "byte representation of that integer."))
          + .required(true)
          + .allowableValues(FIELD_ENCODING_STRING, FIELD_ENCODING_BYTES)
          + .defaultValue(FIELD_ENCODING_STRING.getValue())
          + .build();
          +
          + protected static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
          + .name("Batch Size")
          + .description("The maximum number of records to be sent to HBase at any one time from the record set.")
          + .required(true)
          + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
          + .defaultValue("1000")
          + .build();
          +
          + @Override
          + public final List<PropertyDescriptor> getSupportedPropertyDescriptors()

          { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(RECORD_READER_FACTORY); + properties.add(HBASE_CLIENT_SERVICE); + properties.add(TABLE_NAME); + properties.add(ROW_ID); + properties.add(ROW_FIELD_NAME); + properties.add(ROW_ID_ENCODING_STRATEGY); + properties.add(COLUMN_FAMILY); + properties.add(BATCH_SIZE); + properties.add(COMPLEX_FIELD_STRATEGY); + properties.add(FIELD_ENCODING_STRATEGY); + return properties; + }

          +
          + @Override
          + public Set<Relationship> getRelationships()

          { + final Set<Relationship> rels = new HashSet<>(); + rels.add(REL_SUCCESS); + rels.add(REL_FAILURE); + return rels; + }

          +
          + private int addBatch(String tableName, List<PutFlowFile> flowFiles) throws IOException {
          + int columns = 0;
          + clientService.put(tableName, flowFiles);
          + for (PutFlowFile put : flowFiles)

          { + columns += put.getColumns().size(); + }

          +
          + return columns;
          + }
          +
          + private RecordReaderFactory recordParserFactory;
          + @Override
          + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
          + final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
          + FlowFile flowFile = session.get();
          + if (flowFile == null)

          { + return; + }

          +
          + final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER_FACTORY)
          + .asControllerService(RecordReaderFactory.class);
          + List<PutFlowFile> flowFiles = new ArrayList<>();
          + final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
          + final String rowFieldName = context.getProperty(ROW_FIELD_NAME).evaluateAttributeExpressions(flowFile).getValue();
          + final String columnFamily = context.getProperty(COLUMN_FAMILY).evaluateAttributeExpressions(flowFile).getValue();
          + final String fieldEncodingStrategy = context.getProperty(FIELD_ENCODING_STRATEGY).getValue();
          + final long start = System.nanoTime();
          + int index = 0;
          + int columns = 0;
          + boolean failed = false;
          + String startIndexStr = flowFile.getAttribute("restart.index");
          + int startIndex = -1;
          + if (startIndexStr != null)

          { + startIndex = Integer.parseInt(startIndexStr); + }

          +
          + PutFlowFile first = null;
          + PutFlowFile last = null;
          + try (RecordReader reader = recordParserFactory.createRecordReader(flowFile, session.read(flowFile), getLogger())) {
          + Record record;
          + if (startIndex >= 0) {
          + while ( index++ < startIndex && (reader.nextRecord()) != null) {}
          + }
          +
          + while ((record = reader.nextRecord()) != null) {
          + PutFlowFile putFlowFile = createPut(context, record, reader.getSchema(), flowFile, rowFieldName, columnFamily, fieldEncodingStrategy);
          + flowFiles.add(putFlowFile);
          + if (index == 0)

          { + first = putFlowFile; + }

          + index++;
          +
          + if (flowFiles.size() == batchSize)

          { + columns += addBatch(tableName, flowFiles); + last = flowFiles.get(flowFiles.size() - 1); + flowFiles = new ArrayList<>(); + }

          + }
          + if (flowFiles.size() > 0)

          { + columns += addBatch(tableName, flowFiles); + last = flowFiles.get(flowFiles.size() - 1); + }

          + } catch (Exception ex)

          { + getLogger().error("Failed to put records to HBase.", ex); + failed = true; + }

          +
          + if (!failed) {
          + long sendMillis = System.nanoTime() - start;
          + List<String> urls = Arrays.asList(getTransitUri(first), getTransitUri(last));
          + final String details = String.format("Put %d cells to HBase.", columns);
          + session.getProvenanceReporter().send(flowFile, urls.get(0), details, sendMillis);
          — End diff –

          We could probably send a single event here since everything we sent came from the same flow file.

          We can override getTransitUri so that it doesn't include the row id since that only made sense before when it was a row per flow file:
          ```
          protected String getTransitUri(PutFlowFile putFlowFile)

          { return "hbase://" + putFlowFile.getTableName(); }

          ```
          Then in the details we can have the number of records sent like you have, or possibly the range from start index to end index.

          We should also consider sending a provenance event in the failure case. Maybe in the failure case we can look at the value of columns and if its > 0 then we know we sent some data successfully and we can report a send event in that case.

          Show
          githubbot ASF GitHub Bot added a comment - Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/1961#discussion_r126231150 — Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java — @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.hbase.put.PutColumn; +import org.apache.nifi.hbase.put.PutFlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +@EventDriven +@SupportsBatching +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags( {"hadoop", "hbase", "put", "record"} ) +@CapabilityDescription("Adds rows to HBase based on the contents of a flowfile using a configured record reader.") +public class PutHBaseRecord extends AbstractPutHBase { + + protected static final PropertyDescriptor ROW_FIELD_NAME = new PropertyDescriptor.Builder() + .name("Row Identifier Field Name") + .description("Specifies the name of a record field whose value should be used as the row id for the given record.") + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + protected static final String FAIL_VALUE = "Fail"; + protected static final String WARN_VALUE = "Warn"; + protected static final String IGNORE_VALUE = "Ignore"; + protected static final String TEXT_VALUE = "Text"; + + protected static final AllowableValue COMPLEX_FIELD_FAIL = new AllowableValue(FAIL_VALUE, FAIL_VALUE, "Route entire FlowFile to failure if any elements contain complex values."); + protected static final AllowableValue COMPLEX_FIELD_WARN = new AllowableValue(WARN_VALUE, WARN_VALUE, "Provide a warning and do not include field in row sent to HBase."); + protected static final AllowableValue COMPLEX_FIELD_IGNORE = new AllowableValue(IGNORE_VALUE, IGNORE_VALUE, "Silently ignore and do not include in row sent to HBase."); + protected static final AllowableValue COMPLEX_FIELD_TEXT = new AllowableValue(TEXT_VALUE, TEXT_VALUE, "Use the string representation of the complex field as the value of the given column."); + + static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder() + .name("record-reader") + .displayName("Record Reader") + .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema") + .identifiesControllerService(RecordReaderFactory.class) + .required(true) + .build(); + + protected static final PropertyDescriptor COMPLEX_FIELD_STRATEGY = new PropertyDescriptor.Builder() + .name("Complex Field Strategy") + .description("Indicates how to handle complex fields, i.e. fields that do not have a single text value.") + .expressionLanguageSupported(false) + .required(true) + .allowableValues(COMPLEX_FIELD_FAIL, COMPLEX_FIELD_WARN, COMPLEX_FIELD_IGNORE, COMPLEX_FIELD_TEXT) + .defaultValue(COMPLEX_FIELD_TEXT.getValue()) + .build(); + + + protected static final AllowableValue FIELD_ENCODING_STRING = new AllowableValue(STRING_ENCODING_VALUE, STRING_ENCODING_VALUE, + "Stores the value of each field as a UTF-8 String."); + protected static final AllowableValue FIELD_ENCODING_BYTES = new AllowableValue(BYTES_ENCODING_VALUE, BYTES_ENCODING_VALUE, + "Stores the value of each field as the byte representation of the type derived from the record."); + + protected static final PropertyDescriptor FIELD_ENCODING_STRATEGY = new PropertyDescriptor.Builder() + .name("Field Encoding Strategy") + .description(("Indicates how to store the value of each field in HBase. The default behavior is to convert each value from the " + + "record to a String, and store the UTF-8 bytes. Choosing Bytes will interpret the type of each field from " + + "the record, and convert the value to the byte representation of that type, meaning an integer will be stored as the " + + "byte representation of that integer.")) + .required(true) + .allowableValues(FIELD_ENCODING_STRING, FIELD_ENCODING_BYTES) + .defaultValue(FIELD_ENCODING_STRING.getValue()) + .build(); + + protected static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .name("Batch Size") + .description("The maximum number of records to be sent to HBase at any one time from the record set.") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("1000") + .build(); + + @Override + public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(RECORD_READER_FACTORY); + properties.add(HBASE_CLIENT_SERVICE); + properties.add(TABLE_NAME); + properties.add(ROW_ID); + properties.add(ROW_FIELD_NAME); + properties.add(ROW_ID_ENCODING_STRATEGY); + properties.add(COLUMN_FAMILY); + properties.add(BATCH_SIZE); + properties.add(COMPLEX_FIELD_STRATEGY); + properties.add(FIELD_ENCODING_STRATEGY); + return properties; + } + + @Override + public Set<Relationship> getRelationships() { + final Set<Relationship> rels = new HashSet<>(); + rels.add(REL_SUCCESS); + rels.add(REL_FAILURE); + return rels; + } + + private int addBatch(String tableName, List<PutFlowFile> flowFiles) throws IOException { + int columns = 0; + clientService.put(tableName, flowFiles); + for (PutFlowFile put : flowFiles) { + columns += put.getColumns().size(); + } + + return columns; + } + + private RecordReaderFactory recordParserFactory; + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER_FACTORY) + .asControllerService(RecordReaderFactory.class); + List<PutFlowFile> flowFiles = new ArrayList<>(); + final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); + final String rowFieldName = context.getProperty(ROW_FIELD_NAME).evaluateAttributeExpressions(flowFile).getValue(); + final String columnFamily = context.getProperty(COLUMN_FAMILY).evaluateAttributeExpressions(flowFile).getValue(); + final String fieldEncodingStrategy = context.getProperty(FIELD_ENCODING_STRATEGY).getValue(); + final long start = System.nanoTime(); + int index = 0; + int columns = 0; + boolean failed = false; + String startIndexStr = flowFile.getAttribute("restart.index"); + int startIndex = -1; + if (startIndexStr != null) { + startIndex = Integer.parseInt(startIndexStr); + } + + PutFlowFile first = null; + PutFlowFile last = null; + try (RecordReader reader = recordParserFactory.createRecordReader(flowFile, session.read(flowFile), getLogger())) { + Record record; + if (startIndex >= 0) { + while ( index++ < startIndex && (reader.nextRecord()) != null) {} + } + + while ((record = reader.nextRecord()) != null) { + PutFlowFile putFlowFile = createPut(context, record, reader.getSchema(), flowFile, rowFieldName, columnFamily, fieldEncodingStrategy); + flowFiles.add(putFlowFile); + if (index == 0) { + first = putFlowFile; + } + index++; + + if (flowFiles.size() == batchSize) { + columns += addBatch(tableName, flowFiles); + last = flowFiles.get(flowFiles.size() - 1); + flowFiles = new ArrayList<>(); + } + } + if (flowFiles.size() > 0) { + columns += addBatch(tableName, flowFiles); + last = flowFiles.get(flowFiles.size() - 1); + } + } catch (Exception ex) { + getLogger().error("Failed to put records to HBase.", ex); + failed = true; + } + + if (!failed) { + long sendMillis = System.nanoTime() - start; + List<String> urls = Arrays.asList(getTransitUri(first), getTransitUri(last)); + final String details = String.format("Put %d cells to HBase.", columns); + session.getProvenanceReporter().send(flowFile, urls.get(0), details, sendMillis); — End diff – We could probably send a single event here since everything we sent came from the same flow file. We can override getTransitUri so that it doesn't include the row id since that only made sense before when it was a row per flow file: ``` protected String getTransitUri(PutFlowFile putFlowFile) { return "hbase://" + putFlowFile.getTableName(); } ``` Then in the details we can have the number of records sent like you have, or possibly the range from start index to end index. We should also consider sending a provenance event in the failure case. Maybe in the failure case we can look at the value of columns and if its > 0 then we know we sent some data successfully and we can report a send event in that case.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user bbende commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1961#discussion_r126431845

          — Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java —
          @@ -207,6 +211,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
          final String details = String.format("Put %d cells to HBase.", columns);
          session.getProvenanceReporter().send(flowFile, urls.get(0), details, sendMillis);
          session.getProvenanceReporter().send(flowFile, urls.get(1), details, sendMillis);
          — End diff –

          We still have two calls to send a provenance event, I think we can get rid of the second one, and then we still need to send an event before transferring to failure.

          Show
          githubbot ASF GitHub Bot added a comment - Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/1961#discussion_r126431845 — Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java — @@ -207,6 +211,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session final String details = String.format("Put %d cells to HBase.", columns); session.getProvenanceReporter().send(flowFile, urls.get(0), details, sendMillis); session.getProvenanceReporter().send(flowFile, urls.get(1), details, sendMillis); — End diff – We still have two calls to send a provenance event, I think we can get rid of the second one, and then we still need to send an event before transferring to failure.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user bbende commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1961#discussion_r126431547

          — Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java —
          @@ -49,10 +51,12 @@
          @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
          @Tags(

          {"hadoop", "hbase", "put", "record"}

          )
          @CapabilityDescription("Adds rows to HBase based on the contents of a flowfile using a configured record reader.")
          +@ReadsAttribute(attribute = "restart.index", description = "Reads restart.index when it needs to replay part of a record set that did not get into HBase.")
          +@WritesAttribute(attribute = "restart.index", description = "Writes restart.index when a batch fails to be insert into HBase")
          public class PutHBaseRecord extends AbstractPutHBase {

          protected static final PropertyDescriptor ROW_FIELD_NAME = new PropertyDescriptor.Builder()

          • .name("Row Identifier Field Name")
            + .name("Row Identifier Record Path")
              • End diff –

          I think I confused things here with my previous comment... If we made this property a record path then it requires more code later on to evaluate the record path against the record, which we aren't doing right now, so lets put this back to "Row Identifier Field Name". Sorry for the confusion.

          Show
          githubbot ASF GitHub Bot added a comment - Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/1961#discussion_r126431547 — Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java — @@ -49,10 +51,12 @@ @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) @Tags( {"hadoop", "hbase", "put", "record"} ) @CapabilityDescription("Adds rows to HBase based on the contents of a flowfile using a configured record reader.") +@ReadsAttribute(attribute = "restart.index", description = "Reads restart.index when it needs to replay part of a record set that did not get into HBase.") +@WritesAttribute(attribute = "restart.index", description = "Writes restart.index when a batch fails to be insert into HBase") public class PutHBaseRecord extends AbstractPutHBase { protected static final PropertyDescriptor ROW_FIELD_NAME = new PropertyDescriptor.Builder() .name("Row Identifier Field Name") + .name("Row Identifier Record Path") End diff – I think I confused things here with my previous comment... If we made this property a record path then it requires more code later on to evaluate the record path against the record, which we aren't doing right now, so lets put this back to "Row Identifier Field Name". Sorry for the confusion.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user bbende commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1961#discussion_r126430219

          — Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java —
          @@ -295,13 +302,21 @@ protected PutFlowFile createPut(ProcessContext context, Record record, RecordSch
          if (name.equals(rowFieldName))

          { continue; }
          • columns.add(new PutColumn(fam, clientService.toBytes(name), asBytes(name, schema.getField(name).get().getDataType().getFieldType(), record, asString)));
            + columns.add(new PutColumn(fam, clientService.toBytes(name), asBytes(name,
            + schema.getField(name).get().getDataType().getFieldType(), record, asString, complexFieldStrategy)));
            + }
            + String rowIdValue = record.getAsString(rowFieldName);
            + if (rowIdValue == null) { + throw new Exception(String.format("Row ID was null for flowfile with ID %s", flowFile.getAttribute("uuid"))); }
          • retVal = new PutFlowFile(tableName, clientService.toBytes(record.getAsString(rowFieldName)), columns, flowFile);
            + byte[] rowId = getRow(rowIdValue, fieldEncodingStrategy);
              • End diff –

          Should this be the "row encoding strategy" rather than the "field encoding strategy"?

          There's actually separate properties for these (FIELD_ENCODING_STRATEGY vs. ROW_ID_ENCODING_STRATEGY).

          Show
          githubbot ASF GitHub Bot added a comment - Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/1961#discussion_r126430219 — Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java — @@ -295,13 +302,21 @@ protected PutFlowFile createPut(ProcessContext context, Record record, RecordSch if (name.equals(rowFieldName)) { continue; } columns.add(new PutColumn(fam, clientService.toBytes(name), asBytes(name, schema.getField(name).get().getDataType().getFieldType(), record, asString))); + columns.add(new PutColumn(fam, clientService.toBytes(name), asBytes(name, + schema.getField(name).get().getDataType().getFieldType(), record, asString, complexFieldStrategy))); + } + String rowIdValue = record.getAsString(rowFieldName); + if (rowIdValue == null) { + throw new Exception(String.format("Row ID was null for flowfile with ID %s", flowFile.getAttribute("uuid"))); } retVal = new PutFlowFile(tableName, clientService.toBytes(record.getAsString(rowFieldName)), columns, flowFile); + byte[] rowId = getRow(rowIdValue, fieldEncodingStrategy); End diff – Should this be the "row encoding strategy" rather than the "field encoding strategy"? There's actually separate properties for these (FIELD_ENCODING_STRATEGY vs. ROW_ID_ENCODING_STRATEGY).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user MikeThomsen commented on the issue:

          https://github.com/apache/nifi/pull/1961

          @bbende Updated. Let me know what you think when you get a chance. Thanks.

          Show
          githubbot ASF GitHub Bot added a comment - Github user MikeThomsen commented on the issue: https://github.com/apache/nifi/pull/1961 @bbende Updated. Let me know what you think when you get a chance. Thanks.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user bbende commented on the issue:

          https://github.com/apache/nifi/pull/1961

          @MikeThomsen thanks Mike... I added a couple of comments this morning, not sure if you saw those yet, but I think we're close.

          Show
          githubbot ASF GitHub Bot added a comment - Github user bbende commented on the issue: https://github.com/apache/nifi/pull/1961 @MikeThomsen thanks Mike... I added a couple of comments this morning, not sure if you saw those yet, but I think we're close.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user MikeThomsen commented on the issue:

          https://github.com/apache/nifi/pull/1961

          @bbende Ok, should be good to go now.

          Show
          githubbot ASF GitHub Bot added a comment - Github user MikeThomsen commented on the issue: https://github.com/apache/nifi/pull/1961 @bbende Ok, should be good to go now.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user MikeThomsen commented on the issue:

          https://github.com/apache/nifi/pull/1961

          @bbende Saw on the mailing list that you've been away for a little while. Any chance you're back and ready to take a look?

          Show
          githubbot ASF GitHub Bot added a comment - Github user MikeThomsen commented on the issue: https://github.com/apache/nifi/pull/1961 @bbende Saw on the mailing list that you've been away for a little while. Any chance you're back and ready to take a look?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user bbende commented on the issue:

          https://github.com/apache/nifi/pull/1961

          @MikeThomsen I should be able to take a look early next week

          Show
          githubbot ASF GitHub Bot added a comment - Github user bbende commented on the issue: https://github.com/apache/nifi/pull/1961 @MikeThomsen I should be able to take a look early next week
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user bbende commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1961#discussion_r130412355

          — Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java —
          @@ -0,0 +1,323 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.hbase;
          +
          +import org.apache.nifi.annotation.behavior.EventDriven;
          +import org.apache.nifi.annotation.behavior.InputRequirement;
          +import org.apache.nifi.annotation.behavior.ReadsAttribute;
          +import org.apache.nifi.annotation.behavior.SupportsBatching;
          +import org.apache.nifi.annotation.behavior.WritesAttribute;
          +import org.apache.nifi.annotation.documentation.CapabilityDescription;
          +import org.apache.nifi.annotation.documentation.Tags;
          +import org.apache.nifi.components.AllowableValue;
          +import org.apache.nifi.components.PropertyDescriptor;
          +import org.apache.nifi.flowfile.FlowFile;
          +import org.apache.nifi.hbase.put.PutColumn;
          +import org.apache.nifi.hbase.put.PutFlowFile;
          +import org.apache.nifi.processor.ProcessContext;
          +import org.apache.nifi.processor.ProcessSession;
          +import org.apache.nifi.processor.Relationship;
          +import org.apache.nifi.processor.exception.ProcessException;
          +import org.apache.nifi.processor.util.StandardValidators;
          +import org.apache.nifi.serialization.RecordReader;
          +import org.apache.nifi.serialization.RecordReaderFactory;
          +import org.apache.nifi.serialization.record.Record;
          +import org.apache.nifi.serialization.record.RecordFieldType;
          +import org.apache.nifi.serialization.record.RecordSchema;
          +
          +import java.io.IOException;
          +import java.util.ArrayList;
          +import java.util.HashSet;
          +import java.util.List;
          +import java.util.Set;
          +
          +@EventDriven
          +@SupportsBatching
          +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
          +@Tags(

          {"hadoop", "hbase", "put", "record"}

          )
          +@CapabilityDescription("Adds rows to HBase based on the contents of a flowfile using a configured record reader.")
          +@ReadsAttribute(attribute = "restart.index", description = "Reads restart.index when it needs to replay part of a record set that did not get into HBase.")
          +@WritesAttribute(attribute = "restart.index", description = "Writes restart.index when a batch fails to be insert into HBase")
          +public class PutHBaseRecord extends AbstractPutHBase {
          +
          + protected static final PropertyDescriptor ROW_FIELD_NAME = new PropertyDescriptor.Builder()
          + .name("Row Identifier Field Path")
          — End diff –

          Minor, but can we go back to calling this "Row Identifier Field Name"?

          Show
          githubbot ASF GitHub Bot added a comment - Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/1961#discussion_r130412355 — Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java — @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.hbase.put.PutColumn; +import org.apache.nifi.hbase.put.PutFlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +@EventDriven +@SupportsBatching +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags( {"hadoop", "hbase", "put", "record"} ) +@CapabilityDescription("Adds rows to HBase based on the contents of a flowfile using a configured record reader.") +@ReadsAttribute(attribute = "restart.index", description = "Reads restart.index when it needs to replay part of a record set that did not get into HBase.") +@WritesAttribute(attribute = "restart.index", description = "Writes restart.index when a batch fails to be insert into HBase") +public class PutHBaseRecord extends AbstractPutHBase { + + protected static final PropertyDescriptor ROW_FIELD_NAME = new PropertyDescriptor.Builder() + .name("Row Identifier Field Path") — End diff – Minor, but can we go back to calling this "Row Identifier Field Name"?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user bbende commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1961#discussion_r130413186

          — Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java —
          @@ -0,0 +1,323 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.hbase;
          +
          +import org.apache.nifi.annotation.behavior.EventDriven;
          +import org.apache.nifi.annotation.behavior.InputRequirement;
          +import org.apache.nifi.annotation.behavior.ReadsAttribute;
          +import org.apache.nifi.annotation.behavior.SupportsBatching;
          +import org.apache.nifi.annotation.behavior.WritesAttribute;
          +import org.apache.nifi.annotation.documentation.CapabilityDescription;
          +import org.apache.nifi.annotation.documentation.Tags;
          +import org.apache.nifi.components.AllowableValue;
          +import org.apache.nifi.components.PropertyDescriptor;
          +import org.apache.nifi.flowfile.FlowFile;
          +import org.apache.nifi.hbase.put.PutColumn;
          +import org.apache.nifi.hbase.put.PutFlowFile;
          +import org.apache.nifi.processor.ProcessContext;
          +import org.apache.nifi.processor.ProcessSession;
          +import org.apache.nifi.processor.Relationship;
          +import org.apache.nifi.processor.exception.ProcessException;
          +import org.apache.nifi.processor.util.StandardValidators;
          +import org.apache.nifi.serialization.RecordReader;
          +import org.apache.nifi.serialization.RecordReaderFactory;
          +import org.apache.nifi.serialization.record.Record;
          +import org.apache.nifi.serialization.record.RecordFieldType;
          +import org.apache.nifi.serialization.record.RecordSchema;
          +
          +import java.io.IOException;
          +import java.util.ArrayList;
          +import java.util.HashSet;
          +import java.util.List;
          +import java.util.Set;
          +
          +@EventDriven
          +@SupportsBatching
          +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
          +@Tags(

          {"hadoop", "hbase", "put", "record"}

          )
          +@CapabilityDescription("Adds rows to HBase based on the contents of a flowfile using a configured record reader.")
          +@ReadsAttribute(attribute = "restart.index", description = "Reads restart.index when it needs to replay part of a record set that did not get into HBase.")
          +@WritesAttribute(attribute = "restart.index", description = "Writes restart.index when a batch fails to be insert into HBase")
          +public class PutHBaseRecord extends AbstractPutHBase {
          +
          + protected static final PropertyDescriptor ROW_FIELD_NAME = new PropertyDescriptor.Builder()
          + .name("Row Identifier Field Path")
          + .description("Specifies the name of a record field whose value should be used as the row id for the given record.")
          + .expressionLanguageSupported(true)
          + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
          + .build();
          +
          + protected static final String FAIL_VALUE = "Fail";
          + protected static final String WARN_VALUE = "Warn";
          + protected static final String IGNORE_VALUE = "Ignore";
          + protected static final String TEXT_VALUE = "Text";
          +
          + protected static final AllowableValue COMPLEX_FIELD_FAIL = new AllowableValue(FAIL_VALUE, FAIL_VALUE, "Route entire FlowFile to failure if any elements contain complex values.");
          + protected static final AllowableValue COMPLEX_FIELD_WARN = new AllowableValue(WARN_VALUE, WARN_VALUE, "Provide a warning and do not include field in row sent to HBase.");
          + protected static final AllowableValue COMPLEX_FIELD_IGNORE = new AllowableValue(IGNORE_VALUE, IGNORE_VALUE, "Silently ignore and do not include in row sent to HBase.");
          + protected static final AllowableValue COMPLEX_FIELD_TEXT = new AllowableValue(TEXT_VALUE, TEXT_VALUE, "Use the string representation of the complex field as the value of the given column.");
          +
          + static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder()
          + .name("record-reader")
          + .displayName("Record Reader")
          + .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema")
          + .identifiesControllerService(RecordReaderFactory.class)
          + .required(true)
          + .build();
          +
          + protected static final PropertyDescriptor COMPLEX_FIELD_STRATEGY = new PropertyDescriptor.Builder()
          + .name("Complex Field Strategy")
          + .description("Indicates how to handle complex fields, i.e. fields that do not have a single text value.")
          + .expressionLanguageSupported(false)
          + .required(true)
          + .allowableValues(COMPLEX_FIELD_FAIL, COMPLEX_FIELD_WARN, COMPLEX_FIELD_IGNORE, COMPLEX_FIELD_TEXT)
          + .defaultValue(COMPLEX_FIELD_TEXT.getValue())
          + .build();
          +
          +
          + protected static final AllowableValue FIELD_ENCODING_STRING = new AllowableValue(STRING_ENCODING_VALUE, STRING_ENCODING_VALUE,
          + "Stores the value of each field as a UTF-8 String.");
          + protected static final AllowableValue FIELD_ENCODING_BYTES = new AllowableValue(BYTES_ENCODING_VALUE, BYTES_ENCODING_VALUE,
          + "Stores the value of each field as the byte representation of the type derived from the record.");
          +
          + protected static final PropertyDescriptor FIELD_ENCODING_STRATEGY = new PropertyDescriptor.Builder()
          + .name("Field Encoding Strategy")
          + .description(("Indicates how to store the value of each field in HBase. The default behavior is to convert each value from the " +
          + "record to a String, and store the UTF-8 bytes. Choosing Bytes will interpret the type of each field from " +
          + "the record, and convert the value to the byte representation of that type, meaning an integer will be stored as the " +
          + "byte representation of that integer."))
          + .required(true)
          + .allowableValues(FIELD_ENCODING_STRING, FIELD_ENCODING_BYTES)
          + .defaultValue(FIELD_ENCODING_STRING.getValue())
          + .build();
          +
          + protected static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
          + .name("Batch Size")
          + .description("The maximum number of records to be sent to HBase at any one time from the record set.")
          + .required(true)
          + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
          + .defaultValue("1000")
          + .build();
          +
          + @Override
          + public final List<PropertyDescriptor> getSupportedPropertyDescriptors()

          { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(RECORD_READER_FACTORY); + properties.add(HBASE_CLIENT_SERVICE); + properties.add(TABLE_NAME); + properties.add(ROW_FIELD_NAME); + properties.add(ROW_ID_ENCODING_STRATEGY); + properties.add(COLUMN_FAMILY); + properties.add(BATCH_SIZE); + properties.add(COMPLEX_FIELD_STRATEGY); + properties.add(FIELD_ENCODING_STRATEGY); + return properties; + }

          +
          + @Override
          + public Set<Relationship> getRelationships()

          { + final Set<Relationship> rels = new HashSet<>(); + rels.add(REL_SUCCESS); + rels.add(REL_FAILURE); + return rels; + }

          +
          + private int addBatch(String tableName, List<PutFlowFile> flowFiles) throws IOException {
          + int columns = 0;
          + clientService.put(tableName, flowFiles);
          + for (PutFlowFile put : flowFiles)

          { + columns += put.getColumns().size(); + }

          +
          + return columns;
          + }
          +
          + private RecordReaderFactory recordParserFactory;
          + @Override
          + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
          + final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
          + FlowFile flowFile = session.get();
          + if (flowFile == null)

          { + return; + }

          +
          + final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER_FACTORY)
          + .asControllerService(RecordReaderFactory.class);
          + List<PutFlowFile> flowFiles = new ArrayList<>();
          + final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
          + final String rowFieldName = context.getProperty(ROW_FIELD_NAME).evaluateAttributeExpressions(flowFile).getValue();
          + final String columnFamily = context.getProperty(COLUMN_FAMILY).evaluateAttributeExpressions(flowFile).getValue();
          + final String fieldEncodingStrategy = context.getProperty(FIELD_ENCODING_STRATEGY).getValue();
          + final String complexFieldStrategy = context.getProperty(COMPLEX_FIELD_STRATEGY).getValue();
          + final String rowEncodingStrategy = context.getProperty(ROW_ID_ENCODING_STRATEGY).getValue();
          +
          + final long start = System.nanoTime();
          + int index = 0;
          + int columns = 0;
          + boolean failed = false;
          + String startIndexStr = flowFile.getAttribute("restart.index");
          + int startIndex = -1;
          + if (startIndexStr != null)

          { + startIndex = Integer.parseInt(startIndexStr); + }

          +
          + PutFlowFile last = null;
          + try (RecordReader reader = recordParserFactory.createRecordReader(flowFile, session.read(flowFile), getLogger())) {
          + Record record;
          + if (startIndex >= 0) {
          + while ( index++ < startIndex && (reader.nextRecord()) != null) {}
          + }
          +
          + while ((record = reader.nextRecord()) != null) {
          + PutFlowFile putFlowFile = createPut(context, record, reader.getSchema(), flowFile, rowFieldName, columnFamily, fieldEncodingStrategy, rowEncodingStrategy, complexFieldStrategy);
          + flowFiles.add(putFlowFile);
          + index++;
          +
          + if (flowFiles.size() == batchSize)

          { + columns += addBatch(tableName, flowFiles); + last = flowFiles.get(flowFiles.size() - 1); + flowFiles = new ArrayList<>(); + }

          + }
          + if (flowFiles.size() > 0)

          { + columns += addBatch(tableName, flowFiles); + last = flowFiles.get(flowFiles.size() - 1); + }

          + } catch (Exception ex)

          { + getLogger().error("Failed to put records to HBase.", ex); + failed = true; + }

          +
          + if (!failed)

          { + sendProvenance(session, flowFile, columns, System.nanoTime() - start, last); + flowFile = session.removeAttribute(flowFile, "restart.index"); + session.transfer(flowFile, REL_SUCCESS); + }

          else

          { + String restartIndex = Integer.toString(index - flowFiles.size()); + flowFile = session.putAttribute(flowFile, "restart.index", restartIndex); + sendProvenance(session, flowFile, columns, System.nanoTime() - start, last); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + }

          +
          +
          +
          + session.commit();
          + }
          +
          + private void sendProvenance(ProcessSession session, FlowFile flowFile, int columns, long time, PutFlowFile pff)

          { + final String details = String.format("Put %d cells to HBase.", columns); + session.getProvenanceReporter().send(flowFile, getTransitUri(pff), details, time); + }

          +
          + @Override
          + protected String getTransitUri(PutFlowFile putFlowFile)

          { + return "hbase://" + putFlowFile.getTableName(); + }

          +
          +
          + @Override
          + protected PutFlowFile createPut(ProcessSession session, ProcessContext context, FlowFile flowFile)

          { + return null; + }

          +
          + protected byte[] asBytes(String field, RecordFieldType fieldType, Record record, boolean asString, String complexFieldStrategy) {
          +
          + byte[] retVal;
          +
          + if (asString)

          { + retVal = clientService.toBytes(record.getAsString(field)); + }

          else {
          + switch (fieldType) {
          + case BOOLEAN:
          + retVal = clientService.toBytes(record.getAsBoolean(field));
          + break;
          + case CHAR:
          + retVal = clientService.toBytes(record.getAsString(field));
          + break;
          + case DOUBLE:
          + retVal = clientService.toBytes(record.getAsDouble(field));
          + break;
          + case FLOAT:
          + retVal = clientService.toBytes(record.getAsFloat(field));
          + break;
          + case INT:
          + retVal = clientService.toBytes(record.getAsInt(field));
          + break;
          + case LONG:
          + retVal = clientService.toBytes(record.getAsLong(field));
          + break;
          + default:
          + retVal = null;
          + switch (complexFieldStrategy) {
          + case FAIL_VALUE:
          + getLogger().error("Complex value found for {}; routing to failure", new Object[]

          {field}

          );
          — End diff –

          It seems like FAIL and WARN both log at the appropriate level and then break, which means they both return null from this method... do we need to do something different so FAIL routes to failure and WARN just skips over?

          Show
          githubbot ASF GitHub Bot added a comment - Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/1961#discussion_r130413186 — Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java — @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.hbase.put.PutColumn; +import org.apache.nifi.hbase.put.PutFlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +@EventDriven +@SupportsBatching +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags( {"hadoop", "hbase", "put", "record"} ) +@CapabilityDescription("Adds rows to HBase based on the contents of a flowfile using a configured record reader.") +@ReadsAttribute(attribute = "restart.index", description = "Reads restart.index when it needs to replay part of a record set that did not get into HBase.") +@WritesAttribute(attribute = "restart.index", description = "Writes restart.index when a batch fails to be insert into HBase") +public class PutHBaseRecord extends AbstractPutHBase { + + protected static final PropertyDescriptor ROW_FIELD_NAME = new PropertyDescriptor.Builder() + .name("Row Identifier Field Path") + .description("Specifies the name of a record field whose value should be used as the row id for the given record.") + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + protected static final String FAIL_VALUE = "Fail"; + protected static final String WARN_VALUE = "Warn"; + protected static final String IGNORE_VALUE = "Ignore"; + protected static final String TEXT_VALUE = "Text"; + + protected static final AllowableValue COMPLEX_FIELD_FAIL = new AllowableValue(FAIL_VALUE, FAIL_VALUE, "Route entire FlowFile to failure if any elements contain complex values."); + protected static final AllowableValue COMPLEX_FIELD_WARN = new AllowableValue(WARN_VALUE, WARN_VALUE, "Provide a warning and do not include field in row sent to HBase."); + protected static final AllowableValue COMPLEX_FIELD_IGNORE = new AllowableValue(IGNORE_VALUE, IGNORE_VALUE, "Silently ignore and do not include in row sent to HBase."); + protected static final AllowableValue COMPLEX_FIELD_TEXT = new AllowableValue(TEXT_VALUE, TEXT_VALUE, "Use the string representation of the complex field as the value of the given column."); + + static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder() + .name("record-reader") + .displayName("Record Reader") + .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema") + .identifiesControllerService(RecordReaderFactory.class) + .required(true) + .build(); + + protected static final PropertyDescriptor COMPLEX_FIELD_STRATEGY = new PropertyDescriptor.Builder() + .name("Complex Field Strategy") + .description("Indicates how to handle complex fields, i.e. fields that do not have a single text value.") + .expressionLanguageSupported(false) + .required(true) + .allowableValues(COMPLEX_FIELD_FAIL, COMPLEX_FIELD_WARN, COMPLEX_FIELD_IGNORE, COMPLEX_FIELD_TEXT) + .defaultValue(COMPLEX_FIELD_TEXT.getValue()) + .build(); + + + protected static final AllowableValue FIELD_ENCODING_STRING = new AllowableValue(STRING_ENCODING_VALUE, STRING_ENCODING_VALUE, + "Stores the value of each field as a UTF-8 String."); + protected static final AllowableValue FIELD_ENCODING_BYTES = new AllowableValue(BYTES_ENCODING_VALUE, BYTES_ENCODING_VALUE, + "Stores the value of each field as the byte representation of the type derived from the record."); + + protected static final PropertyDescriptor FIELD_ENCODING_STRATEGY = new PropertyDescriptor.Builder() + .name("Field Encoding Strategy") + .description(("Indicates how to store the value of each field in HBase. The default behavior is to convert each value from the " + + "record to a String, and store the UTF-8 bytes. Choosing Bytes will interpret the type of each field from " + + "the record, and convert the value to the byte representation of that type, meaning an integer will be stored as the " + + "byte representation of that integer.")) + .required(true) + .allowableValues(FIELD_ENCODING_STRING, FIELD_ENCODING_BYTES) + .defaultValue(FIELD_ENCODING_STRING.getValue()) + .build(); + + protected static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .name("Batch Size") + .description("The maximum number of records to be sent to HBase at any one time from the record set.") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("1000") + .build(); + + @Override + public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(RECORD_READER_FACTORY); + properties.add(HBASE_CLIENT_SERVICE); + properties.add(TABLE_NAME); + properties.add(ROW_FIELD_NAME); + properties.add(ROW_ID_ENCODING_STRATEGY); + properties.add(COLUMN_FAMILY); + properties.add(BATCH_SIZE); + properties.add(COMPLEX_FIELD_STRATEGY); + properties.add(FIELD_ENCODING_STRATEGY); + return properties; + } + + @Override + public Set<Relationship> getRelationships() { + final Set<Relationship> rels = new HashSet<>(); + rels.add(REL_SUCCESS); + rels.add(REL_FAILURE); + return rels; + } + + private int addBatch(String tableName, List<PutFlowFile> flowFiles) throws IOException { + int columns = 0; + clientService.put(tableName, flowFiles); + for (PutFlowFile put : flowFiles) { + columns += put.getColumns().size(); + } + + return columns; + } + + private RecordReaderFactory recordParserFactory; + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER_FACTORY) + .asControllerService(RecordReaderFactory.class); + List<PutFlowFile> flowFiles = new ArrayList<>(); + final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); + final String rowFieldName = context.getProperty(ROW_FIELD_NAME).evaluateAttributeExpressions(flowFile).getValue(); + final String columnFamily = context.getProperty(COLUMN_FAMILY).evaluateAttributeExpressions(flowFile).getValue(); + final String fieldEncodingStrategy = context.getProperty(FIELD_ENCODING_STRATEGY).getValue(); + final String complexFieldStrategy = context.getProperty(COMPLEX_FIELD_STRATEGY).getValue(); + final String rowEncodingStrategy = context.getProperty(ROW_ID_ENCODING_STRATEGY).getValue(); + + final long start = System.nanoTime(); + int index = 0; + int columns = 0; + boolean failed = false; + String startIndexStr = flowFile.getAttribute("restart.index"); + int startIndex = -1; + if (startIndexStr != null) { + startIndex = Integer.parseInt(startIndexStr); + } + + PutFlowFile last = null; + try (RecordReader reader = recordParserFactory.createRecordReader(flowFile, session.read(flowFile), getLogger())) { + Record record; + if (startIndex >= 0) { + while ( index++ < startIndex && (reader.nextRecord()) != null) {} + } + + while ((record = reader.nextRecord()) != null) { + PutFlowFile putFlowFile = createPut(context, record, reader.getSchema(), flowFile, rowFieldName, columnFamily, fieldEncodingStrategy, rowEncodingStrategy, complexFieldStrategy); + flowFiles.add(putFlowFile); + index++; + + if (flowFiles.size() == batchSize) { + columns += addBatch(tableName, flowFiles); + last = flowFiles.get(flowFiles.size() - 1); + flowFiles = new ArrayList<>(); + } + } + if (flowFiles.size() > 0) { + columns += addBatch(tableName, flowFiles); + last = flowFiles.get(flowFiles.size() - 1); + } + } catch (Exception ex) { + getLogger().error("Failed to put records to HBase.", ex); + failed = true; + } + + if (!failed) { + sendProvenance(session, flowFile, columns, System.nanoTime() - start, last); + flowFile = session.removeAttribute(flowFile, "restart.index"); + session.transfer(flowFile, REL_SUCCESS); + } else { + String restartIndex = Integer.toString(index - flowFiles.size()); + flowFile = session.putAttribute(flowFile, "restart.index", restartIndex); + sendProvenance(session, flowFile, columns, System.nanoTime() - start, last); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + } + + + + session.commit(); + } + + private void sendProvenance(ProcessSession session, FlowFile flowFile, int columns, long time, PutFlowFile pff) { + final String details = String.format("Put %d cells to HBase.", columns); + session.getProvenanceReporter().send(flowFile, getTransitUri(pff), details, time); + } + + @Override + protected String getTransitUri(PutFlowFile putFlowFile) { + return "hbase://" + putFlowFile.getTableName(); + } + + + @Override + protected PutFlowFile createPut(ProcessSession session, ProcessContext context, FlowFile flowFile) { + return null; + } + + protected byte[] asBytes(String field, RecordFieldType fieldType, Record record, boolean asString, String complexFieldStrategy) { + + byte[] retVal; + + if (asString) { + retVal = clientService.toBytes(record.getAsString(field)); + } else { + switch (fieldType) { + case BOOLEAN: + retVal = clientService.toBytes(record.getAsBoolean(field)); + break; + case CHAR: + retVal = clientService.toBytes(record.getAsString(field)); + break; + case DOUBLE: + retVal = clientService.toBytes(record.getAsDouble(field)); + break; + case FLOAT: + retVal = clientService.toBytes(record.getAsFloat(field)); + break; + case INT: + retVal = clientService.toBytes(record.getAsInt(field)); + break; + case LONG: + retVal = clientService.toBytes(record.getAsLong(field)); + break; + default: + retVal = null; + switch (complexFieldStrategy) { + case FAIL_VALUE: + getLogger().error("Complex value found for {}; routing to failure", new Object[] {field} ); — End diff – It seems like FAIL and WARN both log at the appropriate level and then break, which means they both return null from this method... do we need to do something different so FAIL routes to failure and WARN just skips over?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user bbende commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1961#discussion_r130415687

          — Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java —
          @@ -0,0 +1,323 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.hbase;
          +
          +import org.apache.nifi.annotation.behavior.EventDriven;
          +import org.apache.nifi.annotation.behavior.InputRequirement;
          +import org.apache.nifi.annotation.behavior.ReadsAttribute;
          +import org.apache.nifi.annotation.behavior.SupportsBatching;
          +import org.apache.nifi.annotation.behavior.WritesAttribute;
          +import org.apache.nifi.annotation.documentation.CapabilityDescription;
          +import org.apache.nifi.annotation.documentation.Tags;
          +import org.apache.nifi.components.AllowableValue;
          +import org.apache.nifi.components.PropertyDescriptor;
          +import org.apache.nifi.flowfile.FlowFile;
          +import org.apache.nifi.hbase.put.PutColumn;
          +import org.apache.nifi.hbase.put.PutFlowFile;
          +import org.apache.nifi.processor.ProcessContext;
          +import org.apache.nifi.processor.ProcessSession;
          +import org.apache.nifi.processor.Relationship;
          +import org.apache.nifi.processor.exception.ProcessException;
          +import org.apache.nifi.processor.util.StandardValidators;
          +import org.apache.nifi.serialization.RecordReader;
          +import org.apache.nifi.serialization.RecordReaderFactory;
          +import org.apache.nifi.serialization.record.Record;
          +import org.apache.nifi.serialization.record.RecordFieldType;
          +import org.apache.nifi.serialization.record.RecordSchema;
          +
          +import java.io.IOException;
          +import java.util.ArrayList;
          +import java.util.HashSet;
          +import java.util.List;
          +import java.util.Set;
          +
          +@EventDriven
          +@SupportsBatching
          +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
          +@Tags(

          {"hadoop", "hbase", "put", "record"}

          )
          +@CapabilityDescription("Adds rows to HBase based on the contents of a flowfile using a configured record reader.")
          +@ReadsAttribute(attribute = "restart.index", description = "Reads restart.index when it needs to replay part of a record set that did not get into HBase.")
          +@WritesAttribute(attribute = "restart.index", description = "Writes restart.index when a batch fails to be insert into HBase")
          +public class PutHBaseRecord extends AbstractPutHBase {
          +
          + protected static final PropertyDescriptor ROW_FIELD_NAME = new PropertyDescriptor.Builder()
          + .name("Row Identifier Field Path")
          + .description("Specifies the name of a record field whose value should be used as the row id for the given record.")
          + .expressionLanguageSupported(true)
          + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
          + .build();
          +
          + protected static final String FAIL_VALUE = "Fail";
          + protected static final String WARN_VALUE = "Warn";
          + protected static final String IGNORE_VALUE = "Ignore";
          + protected static final String TEXT_VALUE = "Text";
          +
          + protected static final AllowableValue COMPLEX_FIELD_FAIL = new AllowableValue(FAIL_VALUE, FAIL_VALUE, "Route entire FlowFile to failure if any elements contain complex values.");
          + protected static final AllowableValue COMPLEX_FIELD_WARN = new AllowableValue(WARN_VALUE, WARN_VALUE, "Provide a warning and do not include field in row sent to HBase.");
          + protected static final AllowableValue COMPLEX_FIELD_IGNORE = new AllowableValue(IGNORE_VALUE, IGNORE_VALUE, "Silently ignore and do not include in row sent to HBase.");
          + protected static final AllowableValue COMPLEX_FIELD_TEXT = new AllowableValue(TEXT_VALUE, TEXT_VALUE, "Use the string representation of the complex field as the value of the given column.");
          +
          + static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder()
          + .name("record-reader")
          + .displayName("Record Reader")
          + .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema")
          + .identifiesControllerService(RecordReaderFactory.class)
          + .required(true)
          + .build();
          +
          + protected static final PropertyDescriptor COMPLEX_FIELD_STRATEGY = new PropertyDescriptor.Builder()
          + .name("Complex Field Strategy")
          + .description("Indicates how to handle complex fields, i.e. fields that do not have a single text value.")
          + .expressionLanguageSupported(false)
          + .required(true)
          + .allowableValues(COMPLEX_FIELD_FAIL, COMPLEX_FIELD_WARN, COMPLEX_FIELD_IGNORE, COMPLEX_FIELD_TEXT)
          + .defaultValue(COMPLEX_FIELD_TEXT.getValue())
          + .build();
          +
          +
          + protected static final AllowableValue FIELD_ENCODING_STRING = new AllowableValue(STRING_ENCODING_VALUE, STRING_ENCODING_VALUE,
          + "Stores the value of each field as a UTF-8 String.");
          + protected static final AllowableValue FIELD_ENCODING_BYTES = new AllowableValue(BYTES_ENCODING_VALUE, BYTES_ENCODING_VALUE,
          + "Stores the value of each field as the byte representation of the type derived from the record.");
          +
          + protected static final PropertyDescriptor FIELD_ENCODING_STRATEGY = new PropertyDescriptor.Builder()
          + .name("Field Encoding Strategy")
          + .description(("Indicates how to store the value of each field in HBase. The default behavior is to convert each value from the " +
          + "record to a String, and store the UTF-8 bytes. Choosing Bytes will interpret the type of each field from " +
          + "the record, and convert the value to the byte representation of that type, meaning an integer will be stored as the " +
          + "byte representation of that integer."))
          + .required(true)
          + .allowableValues(FIELD_ENCODING_STRING, FIELD_ENCODING_BYTES)
          + .defaultValue(FIELD_ENCODING_STRING.getValue())
          + .build();
          +
          + protected static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
          + .name("Batch Size")
          + .description("The maximum number of records to be sent to HBase at any one time from the record set.")
          + .required(true)
          + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
          + .defaultValue("1000")
          + .build();
          +
          + @Override
          + public final List<PropertyDescriptor> getSupportedPropertyDescriptors()

          { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(RECORD_READER_FACTORY); + properties.add(HBASE_CLIENT_SERVICE); + properties.add(TABLE_NAME); + properties.add(ROW_FIELD_NAME); + properties.add(ROW_ID_ENCODING_STRATEGY); + properties.add(COLUMN_FAMILY); + properties.add(BATCH_SIZE); + properties.add(COMPLEX_FIELD_STRATEGY); + properties.add(FIELD_ENCODING_STRATEGY); + return properties; + }

          +
          + @Override
          + public Set<Relationship> getRelationships()

          { + final Set<Relationship> rels = new HashSet<>(); + rels.add(REL_SUCCESS); + rels.add(REL_FAILURE); + return rels; + }

          +
          + private int addBatch(String tableName, List<PutFlowFile> flowFiles) throws IOException {
          + int columns = 0;
          + clientService.put(tableName, flowFiles);
          + for (PutFlowFile put : flowFiles)

          { + columns += put.getColumns().size(); + }

          +
          + return columns;
          + }
          +
          + private RecordReaderFactory recordParserFactory;
          + @Override
          + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
          + final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
          + FlowFile flowFile = session.get();
          + if (flowFile == null)

          { + return; + }

          +
          + final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER_FACTORY)
          + .asControllerService(RecordReaderFactory.class);
          + List<PutFlowFile> flowFiles = new ArrayList<>();
          + final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
          + final String rowFieldName = context.getProperty(ROW_FIELD_NAME).evaluateAttributeExpressions(flowFile).getValue();
          + final String columnFamily = context.getProperty(COLUMN_FAMILY).evaluateAttributeExpressions(flowFile).getValue();
          + final String fieldEncodingStrategy = context.getProperty(FIELD_ENCODING_STRATEGY).getValue();
          + final String complexFieldStrategy = context.getProperty(COMPLEX_FIELD_STRATEGY).getValue();
          + final String rowEncodingStrategy = context.getProperty(ROW_ID_ENCODING_STRATEGY).getValue();
          +
          + final long start = System.nanoTime();
          + int index = 0;
          + int columns = 0;
          + boolean failed = false;
          + String startIndexStr = flowFile.getAttribute("restart.index");
          + int startIndex = -1;
          + if (startIndexStr != null)

          { + startIndex = Integer.parseInt(startIndexStr); + }

          +
          + PutFlowFile last = null;
          + try (RecordReader reader = recordParserFactory.createRecordReader(flowFile, session.read(flowFile), getLogger())) {
          + Record record;
          + if (startIndex >= 0) {
          + while ( index++ < startIndex && (reader.nextRecord()) != null) {}
          + }
          +
          + while ((record = reader.nextRecord()) != null) {
          + PutFlowFile putFlowFile = createPut(context, record, reader.getSchema(), flowFile, rowFieldName, columnFamily, fieldEncodingStrategy, rowEncodingStrategy, complexFieldStrategy);
          + flowFiles.add(putFlowFile);
          + index++;
          +
          + if (flowFiles.size() == batchSize)

          { + columns += addBatch(tableName, flowFiles); + last = flowFiles.get(flowFiles.size() - 1); + flowFiles = new ArrayList<>(); + }

          + }
          + if (flowFiles.size() > 0)

          { + columns += addBatch(tableName, flowFiles); + last = flowFiles.get(flowFiles.size() - 1); + }

          + } catch (Exception ex)

          { + getLogger().error("Failed to put records to HBase.", ex); + failed = true; + }

          +
          + if (!failed)

          { + sendProvenance(session, flowFile, columns, System.nanoTime() - start, last); + flowFile = session.removeAttribute(flowFile, "restart.index"); + session.transfer(flowFile, REL_SUCCESS); + }

          else {
          + String restartIndex = Integer.toString(index - flowFiles.size());
          + flowFile = session.putAttribute(flowFile, "restart.index", restartIndex);
          + sendProvenance(session, flowFile, columns, System.nanoTime() - start, last);
          — End diff –

          Should we wrap this in a conditional like "if columns > 0" then send provenance?

          This would stop us from reporting a provenance event for cases where we didn't send anything successfully.

          Show
          githubbot ASF GitHub Bot added a comment - Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/1961#discussion_r130415687 — Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java — @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.hbase.put.PutColumn; +import org.apache.nifi.hbase.put.PutFlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +@EventDriven +@SupportsBatching +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags( {"hadoop", "hbase", "put", "record"} ) +@CapabilityDescription("Adds rows to HBase based on the contents of a flowfile using a configured record reader.") +@ReadsAttribute(attribute = "restart.index", description = "Reads restart.index when it needs to replay part of a record set that did not get into HBase.") +@WritesAttribute(attribute = "restart.index", description = "Writes restart.index when a batch fails to be insert into HBase") +public class PutHBaseRecord extends AbstractPutHBase { + + protected static final PropertyDescriptor ROW_FIELD_NAME = new PropertyDescriptor.Builder() + .name("Row Identifier Field Path") + .description("Specifies the name of a record field whose value should be used as the row id for the given record.") + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + protected static final String FAIL_VALUE = "Fail"; + protected static final String WARN_VALUE = "Warn"; + protected static final String IGNORE_VALUE = "Ignore"; + protected static final String TEXT_VALUE = "Text"; + + protected static final AllowableValue COMPLEX_FIELD_FAIL = new AllowableValue(FAIL_VALUE, FAIL_VALUE, "Route entire FlowFile to failure if any elements contain complex values."); + protected static final AllowableValue COMPLEX_FIELD_WARN = new AllowableValue(WARN_VALUE, WARN_VALUE, "Provide a warning and do not include field in row sent to HBase."); + protected static final AllowableValue COMPLEX_FIELD_IGNORE = new AllowableValue(IGNORE_VALUE, IGNORE_VALUE, "Silently ignore and do not include in row sent to HBase."); + protected static final AllowableValue COMPLEX_FIELD_TEXT = new AllowableValue(TEXT_VALUE, TEXT_VALUE, "Use the string representation of the complex field as the value of the given column."); + + static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder() + .name("record-reader") + .displayName("Record Reader") + .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema") + .identifiesControllerService(RecordReaderFactory.class) + .required(true) + .build(); + + protected static final PropertyDescriptor COMPLEX_FIELD_STRATEGY = new PropertyDescriptor.Builder() + .name("Complex Field Strategy") + .description("Indicates how to handle complex fields, i.e. fields that do not have a single text value.") + .expressionLanguageSupported(false) + .required(true) + .allowableValues(COMPLEX_FIELD_FAIL, COMPLEX_FIELD_WARN, COMPLEX_FIELD_IGNORE, COMPLEX_FIELD_TEXT) + .defaultValue(COMPLEX_FIELD_TEXT.getValue()) + .build(); + + + protected static final AllowableValue FIELD_ENCODING_STRING = new AllowableValue(STRING_ENCODING_VALUE, STRING_ENCODING_VALUE, + "Stores the value of each field as a UTF-8 String."); + protected static final AllowableValue FIELD_ENCODING_BYTES = new AllowableValue(BYTES_ENCODING_VALUE, BYTES_ENCODING_VALUE, + "Stores the value of each field as the byte representation of the type derived from the record."); + + protected static final PropertyDescriptor FIELD_ENCODING_STRATEGY = new PropertyDescriptor.Builder() + .name("Field Encoding Strategy") + .description(("Indicates how to store the value of each field in HBase. The default behavior is to convert each value from the " + + "record to a String, and store the UTF-8 bytes. Choosing Bytes will interpret the type of each field from " + + "the record, and convert the value to the byte representation of that type, meaning an integer will be stored as the " + + "byte representation of that integer.")) + .required(true) + .allowableValues(FIELD_ENCODING_STRING, FIELD_ENCODING_BYTES) + .defaultValue(FIELD_ENCODING_STRING.getValue()) + .build(); + + protected static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .name("Batch Size") + .description("The maximum number of records to be sent to HBase at any one time from the record set.") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("1000") + .build(); + + @Override + public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(RECORD_READER_FACTORY); + properties.add(HBASE_CLIENT_SERVICE); + properties.add(TABLE_NAME); + properties.add(ROW_FIELD_NAME); + properties.add(ROW_ID_ENCODING_STRATEGY); + properties.add(COLUMN_FAMILY); + properties.add(BATCH_SIZE); + properties.add(COMPLEX_FIELD_STRATEGY); + properties.add(FIELD_ENCODING_STRATEGY); + return properties; + } + + @Override + public Set<Relationship> getRelationships() { + final Set<Relationship> rels = new HashSet<>(); + rels.add(REL_SUCCESS); + rels.add(REL_FAILURE); + return rels; + } + + private int addBatch(String tableName, List<PutFlowFile> flowFiles) throws IOException { + int columns = 0; + clientService.put(tableName, flowFiles); + for (PutFlowFile put : flowFiles) { + columns += put.getColumns().size(); + } + + return columns; + } + + private RecordReaderFactory recordParserFactory; + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER_FACTORY) + .asControllerService(RecordReaderFactory.class); + List<PutFlowFile> flowFiles = new ArrayList<>(); + final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); + final String rowFieldName = context.getProperty(ROW_FIELD_NAME).evaluateAttributeExpressions(flowFile).getValue(); + final String columnFamily = context.getProperty(COLUMN_FAMILY).evaluateAttributeExpressions(flowFile).getValue(); + final String fieldEncodingStrategy = context.getProperty(FIELD_ENCODING_STRATEGY).getValue(); + final String complexFieldStrategy = context.getProperty(COMPLEX_FIELD_STRATEGY).getValue(); + final String rowEncodingStrategy = context.getProperty(ROW_ID_ENCODING_STRATEGY).getValue(); + + final long start = System.nanoTime(); + int index = 0; + int columns = 0; + boolean failed = false; + String startIndexStr = flowFile.getAttribute("restart.index"); + int startIndex = -1; + if (startIndexStr != null) { + startIndex = Integer.parseInt(startIndexStr); + } + + PutFlowFile last = null; + try (RecordReader reader = recordParserFactory.createRecordReader(flowFile, session.read(flowFile), getLogger())) { + Record record; + if (startIndex >= 0) { + while ( index++ < startIndex && (reader.nextRecord()) != null) {} + } + + while ((record = reader.nextRecord()) != null) { + PutFlowFile putFlowFile = createPut(context, record, reader.getSchema(), flowFile, rowFieldName, columnFamily, fieldEncodingStrategy, rowEncodingStrategy, complexFieldStrategy); + flowFiles.add(putFlowFile); + index++; + + if (flowFiles.size() == batchSize) { + columns += addBatch(tableName, flowFiles); + last = flowFiles.get(flowFiles.size() - 1); + flowFiles = new ArrayList<>(); + } + } + if (flowFiles.size() > 0) { + columns += addBatch(tableName, flowFiles); + last = flowFiles.get(flowFiles.size() - 1); + } + } catch (Exception ex) { + getLogger().error("Failed to put records to HBase.", ex); + failed = true; + } + + if (!failed) { + sendProvenance(session, flowFile, columns, System.nanoTime() - start, last); + flowFile = session.removeAttribute(flowFile, "restart.index"); + session.transfer(flowFile, REL_SUCCESS); + } else { + String restartIndex = Integer.toString(index - flowFiles.size()); + flowFile = session.putAttribute(flowFile, "restart.index", restartIndex); + sendProvenance(session, flowFile, columns, System.nanoTime() - start, last); — End diff – Should we wrap this in a conditional like "if columns > 0" then send provenance? This would stop us from reporting a provenance event for cases where we didn't send anything successfully.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user MikeThomsen commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1961#discussion_r130873950

          — Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java —
          @@ -0,0 +1,323 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.hbase;
          +
          +import org.apache.nifi.annotation.behavior.EventDriven;
          +import org.apache.nifi.annotation.behavior.InputRequirement;
          +import org.apache.nifi.annotation.behavior.ReadsAttribute;
          +import org.apache.nifi.annotation.behavior.SupportsBatching;
          +import org.apache.nifi.annotation.behavior.WritesAttribute;
          +import org.apache.nifi.annotation.documentation.CapabilityDescription;
          +import org.apache.nifi.annotation.documentation.Tags;
          +import org.apache.nifi.components.AllowableValue;
          +import org.apache.nifi.components.PropertyDescriptor;
          +import org.apache.nifi.flowfile.FlowFile;
          +import org.apache.nifi.hbase.put.PutColumn;
          +import org.apache.nifi.hbase.put.PutFlowFile;
          +import org.apache.nifi.processor.ProcessContext;
          +import org.apache.nifi.processor.ProcessSession;
          +import org.apache.nifi.processor.Relationship;
          +import org.apache.nifi.processor.exception.ProcessException;
          +import org.apache.nifi.processor.util.StandardValidators;
          +import org.apache.nifi.serialization.RecordReader;
          +import org.apache.nifi.serialization.RecordReaderFactory;
          +import org.apache.nifi.serialization.record.Record;
          +import org.apache.nifi.serialization.record.RecordFieldType;
          +import org.apache.nifi.serialization.record.RecordSchema;
          +
          +import java.io.IOException;
          +import java.util.ArrayList;
          +import java.util.HashSet;
          +import java.util.List;
          +import java.util.Set;
          +
          +@EventDriven
          +@SupportsBatching
          +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
          +@Tags(

          {"hadoop", "hbase", "put", "record"}

          )
          +@CapabilityDescription("Adds rows to HBase based on the contents of a flowfile using a configured record reader.")
          +@ReadsAttribute(attribute = "restart.index", description = "Reads restart.index when it needs to replay part of a record set that did not get into HBase.")
          +@WritesAttribute(attribute = "restart.index", description = "Writes restart.index when a batch fails to be insert into HBase")
          +public class PutHBaseRecord extends AbstractPutHBase {
          +
          + protected static final PropertyDescriptor ROW_FIELD_NAME = new PropertyDescriptor.Builder()
          + .name("Row Identifier Field Path")
          + .description("Specifies the name of a record field whose value should be used as the row id for the given record.")
          + .expressionLanguageSupported(true)
          + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
          + .build();
          +
          + protected static final String FAIL_VALUE = "Fail";
          + protected static final String WARN_VALUE = "Warn";
          + protected static final String IGNORE_VALUE = "Ignore";
          + protected static final String TEXT_VALUE = "Text";
          +
          + protected static final AllowableValue COMPLEX_FIELD_FAIL = new AllowableValue(FAIL_VALUE, FAIL_VALUE, "Route entire FlowFile to failure if any elements contain complex values.");
          + protected static final AllowableValue COMPLEX_FIELD_WARN = new AllowableValue(WARN_VALUE, WARN_VALUE, "Provide a warning and do not include field in row sent to HBase.");
          + protected static final AllowableValue COMPLEX_FIELD_IGNORE = new AllowableValue(IGNORE_VALUE, IGNORE_VALUE, "Silently ignore and do not include in row sent to HBase.");
          + protected static final AllowableValue COMPLEX_FIELD_TEXT = new AllowableValue(TEXT_VALUE, TEXT_VALUE, "Use the string representation of the complex field as the value of the given column.");
          +
          + static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder()
          + .name("record-reader")
          + .displayName("Record Reader")
          + .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema")
          + .identifiesControllerService(RecordReaderFactory.class)
          + .required(true)
          + .build();
          +
          + protected static final PropertyDescriptor COMPLEX_FIELD_STRATEGY = new PropertyDescriptor.Builder()
          + .name("Complex Field Strategy")
          + .description("Indicates how to handle complex fields, i.e. fields that do not have a single text value.")
          + .expressionLanguageSupported(false)
          + .required(true)
          + .allowableValues(COMPLEX_FIELD_FAIL, COMPLEX_FIELD_WARN, COMPLEX_FIELD_IGNORE, COMPLEX_FIELD_TEXT)
          + .defaultValue(COMPLEX_FIELD_TEXT.getValue())
          + .build();
          +
          +
          + protected static final AllowableValue FIELD_ENCODING_STRING = new AllowableValue(STRING_ENCODING_VALUE, STRING_ENCODING_VALUE,
          + "Stores the value of each field as a UTF-8 String.");
          + protected static final AllowableValue FIELD_ENCODING_BYTES = new AllowableValue(BYTES_ENCODING_VALUE, BYTES_ENCODING_VALUE,
          + "Stores the value of each field as the byte representation of the type derived from the record.");
          +
          + protected static final PropertyDescriptor FIELD_ENCODING_STRATEGY = new PropertyDescriptor.Builder()
          + .name("Field Encoding Strategy")
          + .description(("Indicates how to store the value of each field in HBase. The default behavior is to convert each value from the " +
          + "record to a String, and store the UTF-8 bytes. Choosing Bytes will interpret the type of each field from " +
          + "the record, and convert the value to the byte representation of that type, meaning an integer will be stored as the " +
          + "byte representation of that integer."))
          + .required(true)
          + .allowableValues(FIELD_ENCODING_STRING, FIELD_ENCODING_BYTES)
          + .defaultValue(FIELD_ENCODING_STRING.getValue())
          + .build();
          +
          + protected static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
          + .name("Batch Size")
          + .description("The maximum number of records to be sent to HBase at any one time from the record set.")
          + .required(true)
          + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
          + .defaultValue("1000")
          + .build();
          +
          + @Override
          + public final List<PropertyDescriptor> getSupportedPropertyDescriptors()

          { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(RECORD_READER_FACTORY); + properties.add(HBASE_CLIENT_SERVICE); + properties.add(TABLE_NAME); + properties.add(ROW_FIELD_NAME); + properties.add(ROW_ID_ENCODING_STRATEGY); + properties.add(COLUMN_FAMILY); + properties.add(BATCH_SIZE); + properties.add(COMPLEX_FIELD_STRATEGY); + properties.add(FIELD_ENCODING_STRATEGY); + return properties; + }

          +
          + @Override
          + public Set<Relationship> getRelationships()

          { + final Set<Relationship> rels = new HashSet<>(); + rels.add(REL_SUCCESS); + rels.add(REL_FAILURE); + return rels; + }

          +
          + private int addBatch(String tableName, List<PutFlowFile> flowFiles) throws IOException {
          + int columns = 0;
          + clientService.put(tableName, flowFiles);
          + for (PutFlowFile put : flowFiles)

          { + columns += put.getColumns().size(); + }

          +
          + return columns;
          + }
          +
          + private RecordReaderFactory recordParserFactory;
          + @Override
          + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
          + final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
          + FlowFile flowFile = session.get();
          + if (flowFile == null)

          { + return; + }

          +
          + final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER_FACTORY)
          + .asControllerService(RecordReaderFactory.class);
          + List<PutFlowFile> flowFiles = new ArrayList<>();
          + final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
          + final String rowFieldName = context.getProperty(ROW_FIELD_NAME).evaluateAttributeExpressions(flowFile).getValue();
          + final String columnFamily = context.getProperty(COLUMN_FAMILY).evaluateAttributeExpressions(flowFile).getValue();
          + final String fieldEncodingStrategy = context.getProperty(FIELD_ENCODING_STRATEGY).getValue();
          + final String complexFieldStrategy = context.getProperty(COMPLEX_FIELD_STRATEGY).getValue();
          + final String rowEncodingStrategy = context.getProperty(ROW_ID_ENCODING_STRATEGY).getValue();
          +
          + final long start = System.nanoTime();
          + int index = 0;
          + int columns = 0;
          + boolean failed = false;
          + String startIndexStr = flowFile.getAttribute("restart.index");
          + int startIndex = -1;
          + if (startIndexStr != null)

          { + startIndex = Integer.parseInt(startIndexStr); + }

          +
          + PutFlowFile last = null;
          + try (RecordReader reader = recordParserFactory.createRecordReader(flowFile, session.read(flowFile), getLogger())) {
          + Record record;
          + if (startIndex >= 0) {
          + while ( index++ < startIndex && (reader.nextRecord()) != null) {}
          + }
          +
          + while ((record = reader.nextRecord()) != null) {
          + PutFlowFile putFlowFile = createPut(context, record, reader.getSchema(), flowFile, rowFieldName, columnFamily, fieldEncodingStrategy, rowEncodingStrategy, complexFieldStrategy);
          + flowFiles.add(putFlowFile);
          + index++;
          +
          + if (flowFiles.size() == batchSize)

          { + columns += addBatch(tableName, flowFiles); + last = flowFiles.get(flowFiles.size() - 1); + flowFiles = new ArrayList<>(); + }

          + }
          + if (flowFiles.size() > 0)

          { + columns += addBatch(tableName, flowFiles); + last = flowFiles.get(flowFiles.size() - 1); + }

          + } catch (Exception ex)

          { + getLogger().error("Failed to put records to HBase.", ex); + failed = true; + }

          +
          + if (!failed)

          { + sendProvenance(session, flowFile, columns, System.nanoTime() - start, last); + flowFile = session.removeAttribute(flowFile, "restart.index"); + session.transfer(flowFile, REL_SUCCESS); + }

          else

          { + String restartIndex = Integer.toString(index - flowFiles.size()); + flowFile = session.putAttribute(flowFile, "restart.index", restartIndex); + sendProvenance(session, flowFile, columns, System.nanoTime() - start, last); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + }

          +
          +
          +
          + session.commit();
          + }
          +
          + private void sendProvenance(ProcessSession session, FlowFile flowFile, int columns, long time, PutFlowFile pff)

          { + final String details = String.format("Put %d cells to HBase.", columns); + session.getProvenanceReporter().send(flowFile, getTransitUri(pff), details, time); + }

          +
          + @Override
          + protected String getTransitUri(PutFlowFile putFlowFile)

          { + return "hbase://" + putFlowFile.getTableName(); + }

          +
          +
          + @Override
          + protected PutFlowFile createPut(ProcessSession session, ProcessContext context, FlowFile flowFile)

          { + return null; + }

          +
          + protected byte[] asBytes(String field, RecordFieldType fieldType, Record record, boolean asString, String complexFieldStrategy) {
          +
          + byte[] retVal;
          +
          + if (asString)

          { + retVal = clientService.toBytes(record.getAsString(field)); + }

          else {
          + switch (fieldType) {
          + case BOOLEAN:
          + retVal = clientService.toBytes(record.getAsBoolean(field));
          + break;
          + case CHAR:
          + retVal = clientService.toBytes(record.getAsString(field));
          + break;
          + case DOUBLE:
          + retVal = clientService.toBytes(record.getAsDouble(field));
          + break;
          + case FLOAT:
          + retVal = clientService.toBytes(record.getAsFloat(field));
          + break;
          + case INT:
          + retVal = clientService.toBytes(record.getAsInt(field));
          + break;
          + case LONG:
          + retVal = clientService.toBytes(record.getAsLong(field));
          + break;
          + default:
          + retVal = null;
          + switch (complexFieldStrategy) {
          + case FAIL_VALUE:
          + getLogger().error("Complex value found for {}; routing to failure", new Object[]

          {field}

          );
          — End diff –

          Thanks for the feedback @bbende I'll address these today.

          Show
          githubbot ASF GitHub Bot added a comment - Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/1961#discussion_r130873950 — Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java — @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.hbase.put.PutColumn; +import org.apache.nifi.hbase.put.PutFlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +@EventDriven +@SupportsBatching +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags( {"hadoop", "hbase", "put", "record"} ) +@CapabilityDescription("Adds rows to HBase based on the contents of a flowfile using a configured record reader.") +@ReadsAttribute(attribute = "restart.index", description = "Reads restart.index when it needs to replay part of a record set that did not get into HBase.") +@WritesAttribute(attribute = "restart.index", description = "Writes restart.index when a batch fails to be insert into HBase") +public class PutHBaseRecord extends AbstractPutHBase { + + protected static final PropertyDescriptor ROW_FIELD_NAME = new PropertyDescriptor.Builder() + .name("Row Identifier Field Path") + .description("Specifies the name of a record field whose value should be used as the row id for the given record.") + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + protected static final String FAIL_VALUE = "Fail"; + protected static final String WARN_VALUE = "Warn"; + protected static final String IGNORE_VALUE = "Ignore"; + protected static final String TEXT_VALUE = "Text"; + + protected static final AllowableValue COMPLEX_FIELD_FAIL = new AllowableValue(FAIL_VALUE, FAIL_VALUE, "Route entire FlowFile to failure if any elements contain complex values."); + protected static final AllowableValue COMPLEX_FIELD_WARN = new AllowableValue(WARN_VALUE, WARN_VALUE, "Provide a warning and do not include field in row sent to HBase."); + protected static final AllowableValue COMPLEX_FIELD_IGNORE = new AllowableValue(IGNORE_VALUE, IGNORE_VALUE, "Silently ignore and do not include in row sent to HBase."); + protected static final AllowableValue COMPLEX_FIELD_TEXT = new AllowableValue(TEXT_VALUE, TEXT_VALUE, "Use the string representation of the complex field as the value of the given column."); + + static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder() + .name("record-reader") + .displayName("Record Reader") + .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema") + .identifiesControllerService(RecordReaderFactory.class) + .required(true) + .build(); + + protected static final PropertyDescriptor COMPLEX_FIELD_STRATEGY = new PropertyDescriptor.Builder() + .name("Complex Field Strategy") + .description("Indicates how to handle complex fields, i.e. fields that do not have a single text value.") + .expressionLanguageSupported(false) + .required(true) + .allowableValues(COMPLEX_FIELD_FAIL, COMPLEX_FIELD_WARN, COMPLEX_FIELD_IGNORE, COMPLEX_FIELD_TEXT) + .defaultValue(COMPLEX_FIELD_TEXT.getValue()) + .build(); + + + protected static final AllowableValue FIELD_ENCODING_STRING = new AllowableValue(STRING_ENCODING_VALUE, STRING_ENCODING_VALUE, + "Stores the value of each field as a UTF-8 String."); + protected static final AllowableValue FIELD_ENCODING_BYTES = new AllowableValue(BYTES_ENCODING_VALUE, BYTES_ENCODING_VALUE, + "Stores the value of each field as the byte representation of the type derived from the record."); + + protected static final PropertyDescriptor FIELD_ENCODING_STRATEGY = new PropertyDescriptor.Builder() + .name("Field Encoding Strategy") + .description(("Indicates how to store the value of each field in HBase. The default behavior is to convert each value from the " + + "record to a String, and store the UTF-8 bytes. Choosing Bytes will interpret the type of each field from " + + "the record, and convert the value to the byte representation of that type, meaning an integer will be stored as the " + + "byte representation of that integer.")) + .required(true) + .allowableValues(FIELD_ENCODING_STRING, FIELD_ENCODING_BYTES) + .defaultValue(FIELD_ENCODING_STRING.getValue()) + .build(); + + protected static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .name("Batch Size") + .description("The maximum number of records to be sent to HBase at any one time from the record set.") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("1000") + .build(); + + @Override + public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(RECORD_READER_FACTORY); + properties.add(HBASE_CLIENT_SERVICE); + properties.add(TABLE_NAME); + properties.add(ROW_FIELD_NAME); + properties.add(ROW_ID_ENCODING_STRATEGY); + properties.add(COLUMN_FAMILY); + properties.add(BATCH_SIZE); + properties.add(COMPLEX_FIELD_STRATEGY); + properties.add(FIELD_ENCODING_STRATEGY); + return properties; + } + + @Override + public Set<Relationship> getRelationships() { + final Set<Relationship> rels = new HashSet<>(); + rels.add(REL_SUCCESS); + rels.add(REL_FAILURE); + return rels; + } + + private int addBatch(String tableName, List<PutFlowFile> flowFiles) throws IOException { + int columns = 0; + clientService.put(tableName, flowFiles); + for (PutFlowFile put : flowFiles) { + columns += put.getColumns().size(); + } + + return columns; + } + + private RecordReaderFactory recordParserFactory; + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER_FACTORY) + .asControllerService(RecordReaderFactory.class); + List<PutFlowFile> flowFiles = new ArrayList<>(); + final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); + final String rowFieldName = context.getProperty(ROW_FIELD_NAME).evaluateAttributeExpressions(flowFile).getValue(); + final String columnFamily = context.getProperty(COLUMN_FAMILY).evaluateAttributeExpressions(flowFile).getValue(); + final String fieldEncodingStrategy = context.getProperty(FIELD_ENCODING_STRATEGY).getValue(); + final String complexFieldStrategy = context.getProperty(COMPLEX_FIELD_STRATEGY).getValue(); + final String rowEncodingStrategy = context.getProperty(ROW_ID_ENCODING_STRATEGY).getValue(); + + final long start = System.nanoTime(); + int index = 0; + int columns = 0; + boolean failed = false; + String startIndexStr = flowFile.getAttribute("restart.index"); + int startIndex = -1; + if (startIndexStr != null) { + startIndex = Integer.parseInt(startIndexStr); + } + + PutFlowFile last = null; + try (RecordReader reader = recordParserFactory.createRecordReader(flowFile, session.read(flowFile), getLogger())) { + Record record; + if (startIndex >= 0) { + while ( index++ < startIndex && (reader.nextRecord()) != null) {} + } + + while ((record = reader.nextRecord()) != null) { + PutFlowFile putFlowFile = createPut(context, record, reader.getSchema(), flowFile, rowFieldName, columnFamily, fieldEncodingStrategy, rowEncodingStrategy, complexFieldStrategy); + flowFiles.add(putFlowFile); + index++; + + if (flowFiles.size() == batchSize) { + columns += addBatch(tableName, flowFiles); + last = flowFiles.get(flowFiles.size() - 1); + flowFiles = new ArrayList<>(); + } + } + if (flowFiles.size() > 0) { + columns += addBatch(tableName, flowFiles); + last = flowFiles.get(flowFiles.size() - 1); + } + } catch (Exception ex) { + getLogger().error("Failed to put records to HBase.", ex); + failed = true; + } + + if (!failed) { + sendProvenance(session, flowFile, columns, System.nanoTime() - start, last); + flowFile = session.removeAttribute(flowFile, "restart.index"); + session.transfer(flowFile, REL_SUCCESS); + } else { + String restartIndex = Integer.toString(index - flowFiles.size()); + flowFile = session.putAttribute(flowFile, "restart.index", restartIndex); + sendProvenance(session, flowFile, columns, System.nanoTime() - start, last); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + } + + + + session.commit(); + } + + private void sendProvenance(ProcessSession session, FlowFile flowFile, int columns, long time, PutFlowFile pff) { + final String details = String.format("Put %d cells to HBase.", columns); + session.getProvenanceReporter().send(flowFile, getTransitUri(pff), details, time); + } + + @Override + protected String getTransitUri(PutFlowFile putFlowFile) { + return "hbase://" + putFlowFile.getTableName(); + } + + + @Override + protected PutFlowFile createPut(ProcessSession session, ProcessContext context, FlowFile flowFile) { + return null; + } + + protected byte[] asBytes(String field, RecordFieldType fieldType, Record record, boolean asString, String complexFieldStrategy) { + + byte[] retVal; + + if (asString) { + retVal = clientService.toBytes(record.getAsString(field)); + } else { + switch (fieldType) { + case BOOLEAN: + retVal = clientService.toBytes(record.getAsBoolean(field)); + break; + case CHAR: + retVal = clientService.toBytes(record.getAsString(field)); + break; + case DOUBLE: + retVal = clientService.toBytes(record.getAsDouble(field)); + break; + case FLOAT: + retVal = clientService.toBytes(record.getAsFloat(field)); + break; + case INT: + retVal = clientService.toBytes(record.getAsInt(field)); + break; + case LONG: + retVal = clientService.toBytes(record.getAsLong(field)); + break; + default: + retVal = null; + switch (complexFieldStrategy) { + case FAIL_VALUE: + getLogger().error("Complex value found for {}; routing to failure", new Object[] {field} ); — End diff – Thanks for the feedback @bbende I'll address these today.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user MikeThomsen commented on the issue:

          https://github.com/apache/nifi/pull/1961

          @bbende Done.

          Show
          githubbot ASF GitHub Bot added a comment - Github user MikeThomsen commented on the issue: https://github.com/apache/nifi/pull/1961 @bbende Done.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user bbende commented on the issue:

          https://github.com/apache/nifi/pull/1961

          Thanks @MikeThomsen , two minor issues while testing this...

          1) There is a try-with-resource block that creates the RecordReader, and as part of that line it also creates an InputStream for the flow file, but since the InputStream is created inline it won't be auto-closed which can cause a problem later on.

          2) The complex field handling was only being used in the case where the field encoding strategy was "Bytes", which is different from how PutHBaseJson works.

          I went ahead and addressed these issues in my own branch:
          https://github.com/bbende/nifi/commit/a9d8d038fe40b50bab1a8b59e11ad926ed07cc78

          If you are on-board with my changes then I'll go ahead and merge everything to master.

          Let me know.

          Show
          githubbot ASF GitHub Bot added a comment - Github user bbende commented on the issue: https://github.com/apache/nifi/pull/1961 Thanks @MikeThomsen , two minor issues while testing this... 1) There is a try-with-resource block that creates the RecordReader, and as part of that line it also creates an InputStream for the flow file, but since the InputStream is created inline it won't be auto-closed which can cause a problem later on. 2) The complex field handling was only being used in the case where the field encoding strategy was "Bytes", which is different from how PutHBaseJson works. I went ahead and addressed these issues in my own branch: https://github.com/bbende/nifi/commit/a9d8d038fe40b50bab1a8b59e11ad926ed07cc78 If you are on-board with my changes then I'll go ahead and merge everything to master. Let me know.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user MikeThomsen commented on the issue:

          https://github.com/apache/nifi/pull/1961

          Looks good to me.

          Show
          githubbot ASF GitHub Bot added a comment - Github user MikeThomsen commented on the issue: https://github.com/apache/nifi/pull/1961 Looks good to me.
          Hide
          jira-bot ASF subversion and git services added a comment -

          Commit 496a32e12c4da869996ccee4c1dcf797b229cec3 in nifi's branch refs/heads/master from Mike Thomsen
          [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=496a32e ]

          NIFI-4024 Added org.apache.nifi.hbase.PutHBaseRecord

          Signed-off-by: Bryan Bende <bbende@apache.org>

          Show
          jira-bot ASF subversion and git services added a comment - Commit 496a32e12c4da869996ccee4c1dcf797b229cec3 in nifi's branch refs/heads/master from Mike Thomsen [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=496a32e ] NIFI-4024 Added org.apache.nifi.hbase.PutHBaseRecord Signed-off-by: Bryan Bende <bbende@apache.org>
          Hide
          jira-bot ASF subversion and git services added a comment -

          Commit f8f1cc8d0df4893f35d7cbe3413b7a3dd521bd18 in nifi's branch refs/heads/master from Bryan Bende
          [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=f8f1cc8 ]

          NIFI-4024 Ensuring InputStream gets closed and cleaning up complex field handling

          This closes #1961.

          Signed-off-by: Bryan Bende <bbende@apache.org>

          Show
          jira-bot ASF subversion and git services added a comment - Commit f8f1cc8d0df4893f35d7cbe3413b7a3dd521bd18 in nifi's branch refs/heads/master from Bryan Bende [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=f8f1cc8 ] NIFI-4024 Ensuring InputStream gets closed and cleaning up complex field handling This closes #1961. Signed-off-by: Bryan Bende <bbende@apache.org>
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/nifi/pull/1961

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/nifi/pull/1961
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user bbende commented on the issue:

          https://github.com/apache/nifi/pull/1961

          Merged to master, thanks!

          Show
          githubbot ASF GitHub Bot added a comment - Github user bbende commented on the issue: https://github.com/apache/nifi/pull/1961 Merged to master, thanks!

            People

            • Assignee:
              Unassigned
              Reporter:
              champagst Steve Champagne
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development