Uploaded image for project: 'Apache NiFi'
  1. Apache NiFi
  2. NIFI-4633

ProcessSession.clone(FlowFile) throws FlowFileHandlingException incorrectly

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 1.4.0
    • 1.5.0
    • None
    • None

    Description

      A message was received on the dev@ mailing list with the following situation:

      Hello,

      In updating our Nifi setup to 1.4.0, one of our regression tests started to
      fail.

      On investigation I ran into an issue with FlowFile state that has confused
      me.

      The following processor takes an input flow file, writes "NEW_DATA" to it,
      the clones it.
      It then outputs the original flowFile and the clone to different output
      relationships.
      I had expected that the two outputs would be the same, but if I run this
      through (using GenerateFlowFile to generate input), I get different output.

      If I play in data "OLD", then:
      OutputA receives a flowFile with the written content "NEW_DATA".
      OutputB receives the original content "OLD" that was sent into the
      processor.

      If instead I play in data longer than 8 bytes (ie longer than "NEW_DATA"),
      then CloneTestProcess errors and yields.
      The error from the logs is below.

      Any advice would be appreciated.

      Thanks in advance,
      Matthew Watson

      Code:

      public class CloneTestProcessor extends AbstractProcessor {
      
         public static final Relationship OUTPUT_A = new
      Relationship.Builder().name("OUTPUT_A").build();
         public static final Relationship OUTPUT_B = new
      Relationship.Builder().name("OUTPUT_B").build();
      
         @Override
         public Set<Relationship> getRelationships() {
             return ImmutableSet.of(OUTPUT_A, OUTPUT_B);
         }
      
         @Override
         public void onTrigger(ProcessContext context, ProcessSession
      session) throws ProcessException {
      
             String DATA = "NEW_DATA";
      
             FlowFile flowFile = session.get();
             if(flowFile == null) {
                 return;
             }
      
             FlowFile output = session.write(flowFile, (in, out) -> {
                 out.write(DATA.getBytes());
             });
      
             FlowFile other = session.clone(flowFile);
             session.transfer(flowFile, OUTPUT_A);
             session.transfer(other, OUTPUT_B);
         }
      }
      

      Error:

      2017-11-22 16:56:19,326 WARN [Timer-Driven Process Thread-10]
      o.a.n.c.t.ContinuallyRunProcessorTask
      org.apache.nifi.processor.exception.FlowFileHandlingException: Specified
      offset of 0 and size 9 exceeds size of
      StandardFlowFileRecord[uuid=5d279723-64a3-4ad2-a8f6-3f974baf38ce,claim=StandardContentClaim
      [resourceClaim=StandardResourceClaim[id=1511369750666-1, container=default,
      section=1], offset=51, length=8],offset=0,name=29102663649743,size=8]
             at
      org.apache.nifi.controller.repository.StandardProcessSession.clone(StandardProcessSession.java:1672)
             at
      org.apache.nifi.controller.repository.StandardProcessSession.clone(StandardProcessSession.java:1662)
             at
      com.baesystemsai.nifi.utility.CloneTestProcessor.onTrigger(CloneTestProcessor.java:59)
             at
      org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
             at
      org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1119)
             at
      org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)
             at
      org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
             at
      org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128)
             at
      java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
             at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
             at
      java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
             at
      java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
             at
      java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
             at
      java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
             at java.lang.Thread.run(Thread.java:745)
      

      Attachments

        Issue Links

          Activity

            People

              markap14 Mark Payne
              markap14 Mark Payne
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: