Uploaded image for project: 'Apache NiFi'
  1. Apache NiFi
  2. NIFI-5879

ContentNotFoundException thrown if a FlowFile's content claim is read, then written to, then read again, within the same ProcessSession

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • 1.9.0
    • Core Framework
    • None

    Description

      The following Processor can be used to replicate the issue.

      If a processor reads content, then attempts to write to the content, then read what was just written, a ContentNotFoundException will be thrown.

       

      /*

      • Licensed to the Apache Software Foundation (ASF) under one or more
      • contributor license agreements. See the NOTICE file distributed with
      • this work for additional information regarding copyright ownership.
      • The ASF licenses this file to You under the Apache License, Version 2.0
      • (the "License"); you may not use this file except in compliance with
      • the License. You may obtain a copy of the License at
        *
      • http://www.apache.org/licenses/LICENSE-2.0
        *
      • Unless required by applicable law or agreed to in writing, software
      • distributed under the License is distributed on an "AS IS" BASIS,
      • WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      • See the License for the specific language governing permissions and
      • limitations under the License.
        */
        package org.apache.nifi.processors.standard;

      import org.apache.nifi.components.PropertyDescriptor;
      import org.apache.nifi.components.PropertyDescriptor.Builder;
      import org.apache.nifi.flowfile.FlowFile;
      import org.apache.nifi.processor.AbstractProcessor;
      import org.apache.nifi.processor.ProcessContext;
      import org.apache.nifi.processor.ProcessSession;
      import org.apache.nifi.processor.Relationship;
      import org.apache.nifi.processor.exception.ProcessException;
      import org.apache.nifi.stream.io.StreamUtils;

      import java.io.IOException;
      import java.io.InputStream;
      import java.util.ArrayList;
      import java.util.Collections;
      import java.util.List;
      import java.util.Set;

      import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
      import static org.apache.nifi.processor.util.StandardValidators.POSITIVE_INTEGER_VALIDATOR;

      public class ReplicateWeirdness extends AbstractProcessor {

      static final PropertyDescriptor CLONE_ITERATIONS = new Builder()
      .name("Iterations")
      .displayName("Iterations")
      .description("Number of Iterations")
      .required(true)
      .addValidator(POSITIVE_INTEGER_VALIDATOR)
      .expressionLanguageSupported(NONE)
      .defaultValue("1")
      .build();

      static final PropertyDescriptor WRITE_ITERATIONS = new Builder()
      .name("Write Iterations")
      .displayName("Write Iterations")
      .description("Write Iterations")
      .required(true)
      .addValidator(POSITIVE_INTEGER_VALIDATOR)
      .expressionLanguageSupported(NONE)
      .defaultValue("2")
      .build();

      static final PropertyDescriptor READ_FIRST = new Builder()
      .name("Read First")
      .displayName("Read First")
      .description("Read First")
      .required(true)
      .allowableValues("true", "false")
      .expressionLanguageSupported(NONE)
      .defaultValue("false")
      .build();

      static final Relationship REL_SUCCESS = new Relationship.Builder()
      .name("success")
      .build();

      @Override
      public Set<Relationship> getRelationships()

      { return Collections.singleton(REL_SUCCESS); }

      @Override
      protected List<PropertyDescriptor> getSupportedPropertyDescriptors()

      { final List<PropertyDescriptor> properties = new ArrayList<>(); properties.add(CLONE_ITERATIONS); properties.add(WRITE_ITERATIONS); properties.add(READ_FIRST); return properties; }

      @Override
      public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
      FlowFile original = session.get();
      if (original == null)

      { return; }

      try (final InputStream in = session.read(original))

      { final long originalLength = countBytes(in); getLogger().info("Original FlowFile is " + originalLength + " bytes"); }

      catch (final IOException e)

      { throw new ProcessException(e); }

      final int cloneIterations = context.getProperty(CLONE_ITERATIONS).asInteger();
      final int writeIterations = context.getProperty(WRITE_ITERATIONS).asInteger();
      final boolean readFirst = context.getProperty(READ_FIRST).asBoolean();

      for (int i=0; i < cloneIterations; i++) {
      FlowFile clone = session.clone(original);

      for (int w = 0; w < writeIterations; w++) {
      if (readFirst) {
      try (InputStream in = session.read(clone)) { final long len = countBytes(in); getLogger().info("Read " + len + " bytes"); } catch (IOException e) { throw new ProcessException(e); }

      }

      clone = session.write(clone, out -> out.write("boom".getBytes()));
      clone = session.write(clone, StreamUtils::copy);
      }

      session.transfer(clone, REL_SUCCESS);
      }

      session.transfer(original, REL_SUCCESS);
      }

      private long countBytes(final InputStream in) throws IOException {
      int len = 0;
      while (in.read() >= 0)

      { len++; }

      return len;
      }
      }

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            markap14 Mark Payne
            markap14 Mark Payne
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Time Tracking

                Estimated:
                Original Estimate - Not Specified
                Not Specified
                Remaining:
                Remaining Estimate - 0h
                0h
                Logged:
                Time Spent - 20m
                20m

                Slack

                  Issue deployment