Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
None
-
None
Description
The following Processor can be used to replicate the issue.
If a processor reads content, then attempts to write to the content, then read what was just written, a ContentNotFoundException will be thrown.
/*
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
* - http://www.apache.org/licenses/LICENSE-2.0
* - Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
*/
package org.apache.nifi.processors.standard;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyDescriptor.Builder;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.stream.io.StreamUtils;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
import static org.apache.nifi.processor.util.StandardValidators.POSITIVE_INTEGER_VALIDATOR;
public class ReplicateWeirdness extends AbstractProcessor {
static final PropertyDescriptor CLONE_ITERATIONS = new Builder()
.name("Iterations")
.displayName("Iterations")
.description("Number of Iterations")
.required(true)
.addValidator(POSITIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(NONE)
.defaultValue("1")
.build();
static final PropertyDescriptor WRITE_ITERATIONS = new Builder()
.name("Write Iterations")
.displayName("Write Iterations")
.description("Write Iterations")
.required(true)
.addValidator(POSITIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(NONE)
.defaultValue("2")
.build();
static final PropertyDescriptor READ_FIRST = new Builder()
.name("Read First")
.displayName("Read First")
.description("Read First")
.required(true)
.allowableValues("true", "false")
.expressionLanguageSupported(NONE)
.defaultValue("false")
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.build();
@Override
public Set<Relationship> getRelationships()
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors()
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile original = session.get();
if (original == null)
try (final InputStream in = session.read(original))
{ final long originalLength = countBytes(in); getLogger().info("Original FlowFile is " + originalLength + " bytes"); }catch (final IOException e)
{ throw new ProcessException(e); }final int cloneIterations = context.getProperty(CLONE_ITERATIONS).asInteger();
final int writeIterations = context.getProperty(WRITE_ITERATIONS).asInteger();
final boolean readFirst = context.getProperty(READ_FIRST).asBoolean();
for (int i=0; i < cloneIterations; i++) {
FlowFile clone = session.clone(original);
for (int w = 0; w < writeIterations; w++) {
if (readFirst) {
try (InputStream in = session.read(clone)) { final long len = countBytes(in); getLogger().info("Read " + len + " bytes"); } catch (IOException e) { throw new ProcessException(e); }
}
clone = session.write(clone, out -> out.write("boom".getBytes()));
clone = session.write(clone, StreamUtils::copy);
}
session.transfer(clone, REL_SUCCESS);
}
session.transfer(original, REL_SUCCESS);
}
private long countBytes(final InputStream in) throws IOException {
int len = 0;
while (in.read() >= 0)
return len;
}
}