One of the scenarios for ProcessSession::write was broken after recent code refactoring within the following pull request: https://github.com/apache/nifi/pull/7363/files
The issue is located in StandardContentClaimWriteCache.java in the write(final ContentClaim claim) method that returns an OutputStream used in the OutputStreamCallback interface to let NiFi processors write flowfile content through the ProcessSession::write method.
If a processor calls session.write but does not write any data to the output stream, then none of the write methods in the OutputStream is invoked, hence the length of the content claim is not recomputed, meaning the length will have the default value that is equal to -1. Because of the latest refactoring changes that are based on creating a new content claim on each ProcessSession::write invocation the following formula gives the wrong result:
or as in the codebase:
For example, if the previous offset was 1000 and nothing was written to the stream (length is -1), then 1000 + (-1) will give us 999 which means that the offset is shifted back by one, hence the next content will have an extra character from the previous content at the beginning and will lose the last character at the end, and all other FlowFiles anywhere in NiFi will be corrupted by this defect until the NiFi instance is restarted.
The following steps can be taken to reproduce the issue (critical in our commercial project):
- Create an empty text file (“a.txt”);
- Create a text file with any text (“b.txt”);
- Package these files into a .zip archive;
- Put it into a file system on Azure Cloud (we use ADLS Gen2);
- Read the zip file and unpack its content on the NiFi Canvas using the FetchAzureDataLakeStorage and UnpackContent processors;
- Start a flow with the GenerateFlowFile processor. See the results. The empty file must be extracted before the non-empty file, otherwise the issue won’t reproduce. You’ll see that the second FlowFile content will be corrupted – the first character is an unreadable character from the zip archive (last character of the content with zip) fetched with FetchAzureDataLakeStorage and the last character will be lost. Starting from this point, NiFi cannot be used at all because any other processors will lead to FlowFile content corruption across the entire NiFi instance due to the shifted offset.
A sample canvas:
Important note: the issue is not reproducible if an empty file is a last file to be extracted (the length will be reset when the processor completes), or if you do not call session.write() when a file has 0 bytes (in case if you create your own processor with such logic).
The offsets for the above picture will look like as follows (#1 - after fetching and unpacking an empty file, #2 - before unpacking the second file):
1524 - after FetchAzureDataLakeStorage and UnpackContent for the empty file. The length -1 will be kept instead of 0 and used for the next file which is why the next offset is equal to 1523 (1524 + (-1) = 1523).
if your file has the "Hello world" text inside, then after downloading this unpacked file from NiFi you'll see (the first character here is a space):
Different processors will give you various errors due to the corrupted content especially for the json format and queries: