Uploaded image for project: 'Apache NiFi'
  1. Apache NiFi
  2. NIFI-3356

Provide a newly refactored provenance repository

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.2.0
    • Component/s: Core Framework
    • Labels:
      None

      Description

      The Persistent Provenance Repository has been redesigned a few different times over several years. The original design for the repository was to provide storage of events and sequential iteration over those events via a Reporting Task. After that, we added the ability to compress the data so that it could be held longer. We then introduced the notion of indexing and searching via Lucene. We've since made several more modifications to try to boost performance.

      At this point, however, the repository is still the bottleneck for many flows that handle large volumes of small FlowFiles. We need a new implementation that is based around the current goals for the repository and that can provide better throughput.

        Issue Links

          Activity

          Hide
          ozhurakousky Oleg Zhurakousky added a comment -

          Mark, obviously this one is too large of a change to give it a thorough review it deserves, but it is a much needed change nevertheless and we did have a good discussion in the PR. So it is merged/resolved. Whatever we may have missed could be addressed as a separate JIRA in the future.

          Show
          ozhurakousky Oleg Zhurakousky added a comment - Mark, obviously this one is too large of a change to give it a thorough review it deserves, but it is a much needed change nevertheless and we did have a good discussion in the PR. So it is merged/resolved. Whatever we may have missed could be addressed as a separate JIRA in the future.
          Hide
          jira-bot ASF subversion and git services added a comment -

          Commit 96ed405d708894ee5400ebbdbf335325219faa09 in nifi's branch refs/heads/master from Mark Payne
          [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=96ed405 ]

          NIFI-3356: Initial implementation of writeahead provenance repository

          • The idea behind NIFI-3356 was to improve the efficiency and throughput of the Provenance Repository, as it is often the bottleneck. While testing the newly designed repository,
            a handful of other, fairly minor, changes were made to improve efficiency as well, as these came to light when testing the new repository:
          • Use a BufferedOutputStream within StandardProcessSession (via a ClaimCache abstraction) in order to avoid continually writing to FileOutputStream when writing many small FlowFiles
          • Updated threading model of MinimalLockingWriteAheadLog - now performs serialization outside of lock and writes to a 'synchronized' OutputStream
          • Change minimum scheduling period for components from 30 microseconds to 1 nanosecond. ScheduledExecutor is very inconsistent with timing of task scheduling. With the bored.yield.duration
            now present, this value doesn't need to be set to 30 microseconds. This was originally done to avoid processors that had no work from dominating the CPU. However, now that we will yield
            when processors have no work, this results in slowing down processors that are able to perform work.
          • Allow nifi.properties to specify multiple directories for FlowFile Repository
          • If backpressure is engaged while running a batch of sessions, then stop batch processing earlier. This helps FlowFiles to move through the system much more smoothly instead of the
            herky-jerky queuing that we previously saw at very high rates of FlowFiles.
          • Added NiFi PID to log message when starting nifi. This was simply an update to the log message that provides helpful information.

          NIFI-3356: Fixed bug in ContentClaimWriteCache that resulted in data corruption and fixed bug in RepositoryConfiguration that threw exception if cache warm duration was set to empty string

          NIFI-3356: Fixed NPE

          NIFI-3356: Added debug-level performance monitoring

          NIFI-3356: Updates to unit tests that failed after rebasing against master

          NIFI-3356: Incorporated PR review feedback

          NIFI-3356: Fixed bug where we would delete index directories that are still in use; also added additional debug logging and a simple util class that can be used to textualize provenance event files - useful in debugging

          This closes #1493

          Show
          jira-bot ASF subversion and git services added a comment - Commit 96ed405d708894ee5400ebbdbf335325219faa09 in nifi's branch refs/heads/master from Mark Payne [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=96ed405 ] NIFI-3356 : Initial implementation of writeahead provenance repository The idea behind NIFI-3356 was to improve the efficiency and throughput of the Provenance Repository, as it is often the bottleneck. While testing the newly designed repository, a handful of other, fairly minor, changes were made to improve efficiency as well, as these came to light when testing the new repository: Use a BufferedOutputStream within StandardProcessSession (via a ClaimCache abstraction) in order to avoid continually writing to FileOutputStream when writing many small FlowFiles Updated threading model of MinimalLockingWriteAheadLog - now performs serialization outside of lock and writes to a 'synchronized' OutputStream Change minimum scheduling period for components from 30 microseconds to 1 nanosecond. ScheduledExecutor is very inconsistent with timing of task scheduling. With the bored.yield.duration now present, this value doesn't need to be set to 30 microseconds. This was originally done to avoid processors that had no work from dominating the CPU. However, now that we will yield when processors have no work, this results in slowing down processors that are able to perform work. Allow nifi.properties to specify multiple directories for FlowFile Repository If backpressure is engaged while running a batch of sessions, then stop batch processing earlier. This helps FlowFiles to move through the system much more smoothly instead of the herky-jerky queuing that we previously saw at very high rates of FlowFiles. Added NiFi PID to log message when starting nifi. This was simply an update to the log message that provides helpful information. NIFI-3356 : Fixed bug in ContentClaimWriteCache that resulted in data corruption and fixed bug in RepositoryConfiguration that threw exception if cache warm duration was set to empty string NIFI-3356 : Fixed NPE NIFI-3356 : Added debug-level performance monitoring NIFI-3356 : Updates to unit tests that failed after rebasing against master NIFI-3356 : Incorporated PR review feedback NIFI-3356 : Fixed bug where we would delete index directories that are still in use; also added additional debug logging and a simple util class that can be used to textualize provenance event files - useful in debugging This closes #1493
          Hide
          jira-bot ASF subversion and git services added a comment -

          Commit 96ed405d708894ee5400ebbdbf335325219faa09 in nifi's branch refs/heads/master from Mark Payne
          [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=96ed405 ]

          NIFI-3356: Initial implementation of writeahead provenance repository

          • The idea behind NIFI-3356 was to improve the efficiency and throughput of the Provenance Repository, as it is often the bottleneck. While testing the newly designed repository,
            a handful of other, fairly minor, changes were made to improve efficiency as well, as these came to light when testing the new repository:
          • Use a BufferedOutputStream within StandardProcessSession (via a ClaimCache abstraction) in order to avoid continually writing to FileOutputStream when writing many small FlowFiles
          • Updated threading model of MinimalLockingWriteAheadLog - now performs serialization outside of lock and writes to a 'synchronized' OutputStream
          • Change minimum scheduling period for components from 30 microseconds to 1 nanosecond. ScheduledExecutor is very inconsistent with timing of task scheduling. With the bored.yield.duration
            now present, this value doesn't need to be set to 30 microseconds. This was originally done to avoid processors that had no work from dominating the CPU. However, now that we will yield
            when processors have no work, this results in slowing down processors that are able to perform work.
          • Allow nifi.properties to specify multiple directories for FlowFile Repository
          • If backpressure is engaged while running a batch of sessions, then stop batch processing earlier. This helps FlowFiles to move through the system much more smoothly instead of the
            herky-jerky queuing that we previously saw at very high rates of FlowFiles.
          • Added NiFi PID to log message when starting nifi. This was simply an update to the log message that provides helpful information.

          NIFI-3356: Fixed bug in ContentClaimWriteCache that resulted in data corruption and fixed bug in RepositoryConfiguration that threw exception if cache warm duration was set to empty string

          NIFI-3356: Fixed NPE

          NIFI-3356: Added debug-level performance monitoring

          NIFI-3356: Updates to unit tests that failed after rebasing against master

          NIFI-3356: Incorporated PR review feedback

          NIFI-3356: Fixed bug where we would delete index directories that are still in use; also added additional debug logging and a simple util class that can be used to textualize provenance event files - useful in debugging

          This closes #1493

          Show
          jira-bot ASF subversion and git services added a comment - Commit 96ed405d708894ee5400ebbdbf335325219faa09 in nifi's branch refs/heads/master from Mark Payne [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=96ed405 ] NIFI-3356 : Initial implementation of writeahead provenance repository The idea behind NIFI-3356 was to improve the efficiency and throughput of the Provenance Repository, as it is often the bottleneck. While testing the newly designed repository, a handful of other, fairly minor, changes were made to improve efficiency as well, as these came to light when testing the new repository: Use a BufferedOutputStream within StandardProcessSession (via a ClaimCache abstraction) in order to avoid continually writing to FileOutputStream when writing many small FlowFiles Updated threading model of MinimalLockingWriteAheadLog - now performs serialization outside of lock and writes to a 'synchronized' OutputStream Change minimum scheduling period for components from 30 microseconds to 1 nanosecond. ScheduledExecutor is very inconsistent with timing of task scheduling. With the bored.yield.duration now present, this value doesn't need to be set to 30 microseconds. This was originally done to avoid processors that had no work from dominating the CPU. However, now that we will yield when processors have no work, this results in slowing down processors that are able to perform work. Allow nifi.properties to specify multiple directories for FlowFile Repository If backpressure is engaged while running a batch of sessions, then stop batch processing earlier. This helps FlowFiles to move through the system much more smoothly instead of the herky-jerky queuing that we previously saw at very high rates of FlowFiles. Added NiFi PID to log message when starting nifi. This was simply an update to the log message that provides helpful information. NIFI-3356 : Fixed bug in ContentClaimWriteCache that resulted in data corruption and fixed bug in RepositoryConfiguration that threw exception if cache warm duration was set to empty string NIFI-3356 : Fixed NPE NIFI-3356 : Added debug-level performance monitoring NIFI-3356 : Updates to unit tests that failed after rebasing against master NIFI-3356 : Incorporated PR review feedback NIFI-3356 : Fixed bug where we would delete index directories that are still in use; also added additional debug logging and a simple util class that can be used to textualize provenance event files - useful in debugging This closes #1493
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/nifi/pull/1493

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/nifi/pull/1493
          Hide
          jira-bot ASF subversion and git services added a comment -

          Commit 96ed405d708894ee5400ebbdbf335325219faa09 in nifi's branch refs/heads/master from Mark Payne
          [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=96ed405 ]

          NIFI-3356: Initial implementation of writeahead provenance repository

          • The idea behind NIFI-3356 was to improve the efficiency and throughput of the Provenance Repository, as it is often the bottleneck. While testing the newly designed repository,
            a handful of other, fairly minor, changes were made to improve efficiency as well, as these came to light when testing the new repository:
          • Use a BufferedOutputStream within StandardProcessSession (via a ClaimCache abstraction) in order to avoid continually writing to FileOutputStream when writing many small FlowFiles
          • Updated threading model of MinimalLockingWriteAheadLog - now performs serialization outside of lock and writes to a 'synchronized' OutputStream
          • Change minimum scheduling period for components from 30 microseconds to 1 nanosecond. ScheduledExecutor is very inconsistent with timing of task scheduling. With the bored.yield.duration
            now present, this value doesn't need to be set to 30 microseconds. This was originally done to avoid processors that had no work from dominating the CPU. However, now that we will yield
            when processors have no work, this results in slowing down processors that are able to perform work.
          • Allow nifi.properties to specify multiple directories for FlowFile Repository
          • If backpressure is engaged while running a batch of sessions, then stop batch processing earlier. This helps FlowFiles to move through the system much more smoothly instead of the
            herky-jerky queuing that we previously saw at very high rates of FlowFiles.
          • Added NiFi PID to log message when starting nifi. This was simply an update to the log message that provides helpful information.

          NIFI-3356: Fixed bug in ContentClaimWriteCache that resulted in data corruption and fixed bug in RepositoryConfiguration that threw exception if cache warm duration was set to empty string

          NIFI-3356: Fixed NPE

          NIFI-3356: Added debug-level performance monitoring

          NIFI-3356: Updates to unit tests that failed after rebasing against master

          NIFI-3356: Incorporated PR review feedback

          NIFI-3356: Fixed bug where we would delete index directories that are still in use; also added additional debug logging and a simple util class that can be used to textualize provenance event files - useful in debugging

          This closes #1493

          Show
          jira-bot ASF subversion and git services added a comment - Commit 96ed405d708894ee5400ebbdbf335325219faa09 in nifi's branch refs/heads/master from Mark Payne [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=96ed405 ] NIFI-3356 : Initial implementation of writeahead provenance repository The idea behind NIFI-3356 was to improve the efficiency and throughput of the Provenance Repository, as it is often the bottleneck. While testing the newly designed repository, a handful of other, fairly minor, changes were made to improve efficiency as well, as these came to light when testing the new repository: Use a BufferedOutputStream within StandardProcessSession (via a ClaimCache abstraction) in order to avoid continually writing to FileOutputStream when writing many small FlowFiles Updated threading model of MinimalLockingWriteAheadLog - now performs serialization outside of lock and writes to a 'synchronized' OutputStream Change minimum scheduling period for components from 30 microseconds to 1 nanosecond. ScheduledExecutor is very inconsistent with timing of task scheduling. With the bored.yield.duration now present, this value doesn't need to be set to 30 microseconds. This was originally done to avoid processors that had no work from dominating the CPU. However, now that we will yield when processors have no work, this results in slowing down processors that are able to perform work. Allow nifi.properties to specify multiple directories for FlowFile Repository If backpressure is engaged while running a batch of sessions, then stop batch processing earlier. This helps FlowFiles to move through the system much more smoothly instead of the herky-jerky queuing that we previously saw at very high rates of FlowFiles. Added NiFi PID to log message when starting nifi. This was simply an update to the log message that provides helpful information. NIFI-3356 : Fixed bug in ContentClaimWriteCache that resulted in data corruption and fixed bug in RepositoryConfiguration that threw exception if cache warm duration was set to empty string NIFI-3356 : Fixed NPE NIFI-3356 : Added debug-level performance monitoring NIFI-3356 : Updates to unit tests that failed after rebasing against master NIFI-3356 : Incorporated PR review feedback NIFI-3356 : Fixed bug where we would delete index directories that are still in use; also added additional debug logging and a simple util class that can be used to textualize provenance event files - useful in debugging This closes #1493
          Hide
          jira-bot ASF subversion and git services added a comment -

          Commit 96ed405d708894ee5400ebbdbf335325219faa09 in nifi's branch refs/heads/master from Mark Payne
          [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=96ed405 ]

          NIFI-3356: Initial implementation of writeahead provenance repository

          • The idea behind NIFI-3356 was to improve the efficiency and throughput of the Provenance Repository, as it is often the bottleneck. While testing the newly designed repository,
            a handful of other, fairly minor, changes were made to improve efficiency as well, as these came to light when testing the new repository:
          • Use a BufferedOutputStream within StandardProcessSession (via a ClaimCache abstraction) in order to avoid continually writing to FileOutputStream when writing many small FlowFiles
          • Updated threading model of MinimalLockingWriteAheadLog - now performs serialization outside of lock and writes to a 'synchronized' OutputStream
          • Change minimum scheduling period for components from 30 microseconds to 1 nanosecond. ScheduledExecutor is very inconsistent with timing of task scheduling. With the bored.yield.duration
            now present, this value doesn't need to be set to 30 microseconds. This was originally done to avoid processors that had no work from dominating the CPU. However, now that we will yield
            when processors have no work, this results in slowing down processors that are able to perform work.
          • Allow nifi.properties to specify multiple directories for FlowFile Repository
          • If backpressure is engaged while running a batch of sessions, then stop batch processing earlier. This helps FlowFiles to move through the system much more smoothly instead of the
            herky-jerky queuing that we previously saw at very high rates of FlowFiles.
          • Added NiFi PID to log message when starting nifi. This was simply an update to the log message that provides helpful information.

          NIFI-3356: Fixed bug in ContentClaimWriteCache that resulted in data corruption and fixed bug in RepositoryConfiguration that threw exception if cache warm duration was set to empty string

          NIFI-3356: Fixed NPE

          NIFI-3356: Added debug-level performance monitoring

          NIFI-3356: Updates to unit tests that failed after rebasing against master

          NIFI-3356: Incorporated PR review feedback

          NIFI-3356: Fixed bug where we would delete index directories that are still in use; also added additional debug logging and a simple util class that can be used to textualize provenance event files - useful in debugging

          This closes #1493

          Show
          jira-bot ASF subversion and git services added a comment - Commit 96ed405d708894ee5400ebbdbf335325219faa09 in nifi's branch refs/heads/master from Mark Payne [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=96ed405 ] NIFI-3356 : Initial implementation of writeahead provenance repository The idea behind NIFI-3356 was to improve the efficiency and throughput of the Provenance Repository, as it is often the bottleneck. While testing the newly designed repository, a handful of other, fairly minor, changes were made to improve efficiency as well, as these came to light when testing the new repository: Use a BufferedOutputStream within StandardProcessSession (via a ClaimCache abstraction) in order to avoid continually writing to FileOutputStream when writing many small FlowFiles Updated threading model of MinimalLockingWriteAheadLog - now performs serialization outside of lock and writes to a 'synchronized' OutputStream Change minimum scheduling period for components from 30 microseconds to 1 nanosecond. ScheduledExecutor is very inconsistent with timing of task scheduling. With the bored.yield.duration now present, this value doesn't need to be set to 30 microseconds. This was originally done to avoid processors that had no work from dominating the CPU. However, now that we will yield when processors have no work, this results in slowing down processors that are able to perform work. Allow nifi.properties to specify multiple directories for FlowFile Repository If backpressure is engaged while running a batch of sessions, then stop batch processing earlier. This helps FlowFiles to move through the system much more smoothly instead of the herky-jerky queuing that we previously saw at very high rates of FlowFiles. Added NiFi PID to log message when starting nifi. This was simply an update to the log message that provides helpful information. NIFI-3356 : Fixed bug in ContentClaimWriteCache that resulted in data corruption and fixed bug in RepositoryConfiguration that threw exception if cache warm duration was set to empty string NIFI-3356 : Fixed NPE NIFI-3356 : Added debug-level performance monitoring NIFI-3356 : Updates to unit tests that failed after rebasing against master NIFI-3356 : Incorporated PR review feedback NIFI-3356 : Fixed bug where we would delete index directories that are still in use; also added additional debug logging and a simple util class that can be used to textualize provenance event files - useful in debugging This closes #1493
          Hide
          jira-bot ASF subversion and git services added a comment -

          Commit 96ed405d708894ee5400ebbdbf335325219faa09 in nifi's branch refs/heads/master from Mark Payne
          [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=96ed405 ]

          NIFI-3356: Initial implementation of writeahead provenance repository

          • The idea behind NIFI-3356 was to improve the efficiency and throughput of the Provenance Repository, as it is often the bottleneck. While testing the newly designed repository,
            a handful of other, fairly minor, changes were made to improve efficiency as well, as these came to light when testing the new repository:
          • Use a BufferedOutputStream within StandardProcessSession (via a ClaimCache abstraction) in order to avoid continually writing to FileOutputStream when writing many small FlowFiles
          • Updated threading model of MinimalLockingWriteAheadLog - now performs serialization outside of lock and writes to a 'synchronized' OutputStream
          • Change minimum scheduling period for components from 30 microseconds to 1 nanosecond. ScheduledExecutor is very inconsistent with timing of task scheduling. With the bored.yield.duration
            now present, this value doesn't need to be set to 30 microseconds. This was originally done to avoid processors that had no work from dominating the CPU. However, now that we will yield
            when processors have no work, this results in slowing down processors that are able to perform work.
          • Allow nifi.properties to specify multiple directories for FlowFile Repository
          • If backpressure is engaged while running a batch of sessions, then stop batch processing earlier. This helps FlowFiles to move through the system much more smoothly instead of the
            herky-jerky queuing that we previously saw at very high rates of FlowFiles.
          • Added NiFi PID to log message when starting nifi. This was simply an update to the log message that provides helpful information.

          NIFI-3356: Fixed bug in ContentClaimWriteCache that resulted in data corruption and fixed bug in RepositoryConfiguration that threw exception if cache warm duration was set to empty string

          NIFI-3356: Fixed NPE

          NIFI-3356: Added debug-level performance monitoring

          NIFI-3356: Updates to unit tests that failed after rebasing against master

          NIFI-3356: Incorporated PR review feedback

          NIFI-3356: Fixed bug where we would delete index directories that are still in use; also added additional debug logging and a simple util class that can be used to textualize provenance event files - useful in debugging

          This closes #1493

          Show
          jira-bot ASF subversion and git services added a comment - Commit 96ed405d708894ee5400ebbdbf335325219faa09 in nifi's branch refs/heads/master from Mark Payne [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=96ed405 ] NIFI-3356 : Initial implementation of writeahead provenance repository The idea behind NIFI-3356 was to improve the efficiency and throughput of the Provenance Repository, as it is often the bottleneck. While testing the newly designed repository, a handful of other, fairly minor, changes were made to improve efficiency as well, as these came to light when testing the new repository: Use a BufferedOutputStream within StandardProcessSession (via a ClaimCache abstraction) in order to avoid continually writing to FileOutputStream when writing many small FlowFiles Updated threading model of MinimalLockingWriteAheadLog - now performs serialization outside of lock and writes to a 'synchronized' OutputStream Change minimum scheduling period for components from 30 microseconds to 1 nanosecond. ScheduledExecutor is very inconsistent with timing of task scheduling. With the bored.yield.duration now present, this value doesn't need to be set to 30 microseconds. This was originally done to avoid processors that had no work from dominating the CPU. However, now that we will yield when processors have no work, this results in slowing down processors that are able to perform work. Allow nifi.properties to specify multiple directories for FlowFile Repository If backpressure is engaged while running a batch of sessions, then stop batch processing earlier. This helps FlowFiles to move through the system much more smoothly instead of the herky-jerky queuing that we previously saw at very high rates of FlowFiles. Added NiFi PID to log message when starting nifi. This was simply an update to the log message that provides helpful information. NIFI-3356 : Fixed bug in ContentClaimWriteCache that resulted in data corruption and fixed bug in RepositoryConfiguration that threw exception if cache warm duration was set to empty string NIFI-3356 : Fixed NPE NIFI-3356 : Added debug-level performance monitoring NIFI-3356 : Updates to unit tests that failed after rebasing against master NIFI-3356 : Incorporated PR review feedback NIFI-3356 : Fixed bug where we would delete index directories that are still in use; also added additional debug logging and a simple util class that can be used to textualize provenance event files - useful in debugging This closes #1493
          Hide
          jira-bot ASF subversion and git services added a comment -

          Commit 96ed405d708894ee5400ebbdbf335325219faa09 in nifi's branch refs/heads/master from Mark Payne
          [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=96ed405 ]

          NIFI-3356: Initial implementation of writeahead provenance repository

          • The idea behind NIFI-3356 was to improve the efficiency and throughput of the Provenance Repository, as it is often the bottleneck. While testing the newly designed repository,
            a handful of other, fairly minor, changes were made to improve efficiency as well, as these came to light when testing the new repository:
          • Use a BufferedOutputStream within StandardProcessSession (via a ClaimCache abstraction) in order to avoid continually writing to FileOutputStream when writing many small FlowFiles
          • Updated threading model of MinimalLockingWriteAheadLog - now performs serialization outside of lock and writes to a 'synchronized' OutputStream
          • Change minimum scheduling period for components from 30 microseconds to 1 nanosecond. ScheduledExecutor is very inconsistent with timing of task scheduling. With the bored.yield.duration
            now present, this value doesn't need to be set to 30 microseconds. This was originally done to avoid processors that had no work from dominating the CPU. However, now that we will yield
            when processors have no work, this results in slowing down processors that are able to perform work.
          • Allow nifi.properties to specify multiple directories for FlowFile Repository
          • If backpressure is engaged while running a batch of sessions, then stop batch processing earlier. This helps FlowFiles to move through the system much more smoothly instead of the
            herky-jerky queuing that we previously saw at very high rates of FlowFiles.
          • Added NiFi PID to log message when starting nifi. This was simply an update to the log message that provides helpful information.

          NIFI-3356: Fixed bug in ContentClaimWriteCache that resulted in data corruption and fixed bug in RepositoryConfiguration that threw exception if cache warm duration was set to empty string

          NIFI-3356: Fixed NPE

          NIFI-3356: Added debug-level performance monitoring

          NIFI-3356: Updates to unit tests that failed after rebasing against master

          NIFI-3356: Incorporated PR review feedback

          NIFI-3356: Fixed bug where we would delete index directories that are still in use; also added additional debug logging and a simple util class that can be used to textualize provenance event files - useful in debugging

          This closes #1493

          Show
          jira-bot ASF subversion and git services added a comment - Commit 96ed405d708894ee5400ebbdbf335325219faa09 in nifi's branch refs/heads/master from Mark Payne [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=96ed405 ] NIFI-3356 : Initial implementation of writeahead provenance repository The idea behind NIFI-3356 was to improve the efficiency and throughput of the Provenance Repository, as it is often the bottleneck. While testing the newly designed repository, a handful of other, fairly minor, changes were made to improve efficiency as well, as these came to light when testing the new repository: Use a BufferedOutputStream within StandardProcessSession (via a ClaimCache abstraction) in order to avoid continually writing to FileOutputStream when writing many small FlowFiles Updated threading model of MinimalLockingWriteAheadLog - now performs serialization outside of lock and writes to a 'synchronized' OutputStream Change minimum scheduling period for components from 30 microseconds to 1 nanosecond. ScheduledExecutor is very inconsistent with timing of task scheduling. With the bored.yield.duration now present, this value doesn't need to be set to 30 microseconds. This was originally done to avoid processors that had no work from dominating the CPU. However, now that we will yield when processors have no work, this results in slowing down processors that are able to perform work. Allow nifi.properties to specify multiple directories for FlowFile Repository If backpressure is engaged while running a batch of sessions, then stop batch processing earlier. This helps FlowFiles to move through the system much more smoothly instead of the herky-jerky queuing that we previously saw at very high rates of FlowFiles. Added NiFi PID to log message when starting nifi. This was simply an update to the log message that provides helpful information. NIFI-3356 : Fixed bug in ContentClaimWriteCache that resulted in data corruption and fixed bug in RepositoryConfiguration that threw exception if cache warm duration was set to empty string NIFI-3356 : Fixed NPE NIFI-3356 : Added debug-level performance monitoring NIFI-3356 : Updates to unit tests that failed after rebasing against master NIFI-3356 : Incorporated PR review feedback NIFI-3356 : Fixed bug where we would delete index directories that are still in use; also added additional debug logging and a simple util class that can be used to textualize provenance event files - useful in debugging This closes #1493
          Hide
          jira-bot ASF subversion and git services added a comment -

          Commit 96ed405d708894ee5400ebbdbf335325219faa09 in nifi's branch refs/heads/master from Mark Payne
          [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=96ed405 ]

          NIFI-3356: Initial implementation of writeahead provenance repository

          • The idea behind NIFI-3356 was to improve the efficiency and throughput of the Provenance Repository, as it is often the bottleneck. While testing the newly designed repository,
            a handful of other, fairly minor, changes were made to improve efficiency as well, as these came to light when testing the new repository:
          • Use a BufferedOutputStream within StandardProcessSession (via a ClaimCache abstraction) in order to avoid continually writing to FileOutputStream when writing many small FlowFiles
          • Updated threading model of MinimalLockingWriteAheadLog - now performs serialization outside of lock and writes to a 'synchronized' OutputStream
          • Change minimum scheduling period for components from 30 microseconds to 1 nanosecond. ScheduledExecutor is very inconsistent with timing of task scheduling. With the bored.yield.duration
            now present, this value doesn't need to be set to 30 microseconds. This was originally done to avoid processors that had no work from dominating the CPU. However, now that we will yield
            when processors have no work, this results in slowing down processors that are able to perform work.
          • Allow nifi.properties to specify multiple directories for FlowFile Repository
          • If backpressure is engaged while running a batch of sessions, then stop batch processing earlier. This helps FlowFiles to move through the system much more smoothly instead of the
            herky-jerky queuing that we previously saw at very high rates of FlowFiles.
          • Added NiFi PID to log message when starting nifi. This was simply an update to the log message that provides helpful information.

          NIFI-3356: Fixed bug in ContentClaimWriteCache that resulted in data corruption and fixed bug in RepositoryConfiguration that threw exception if cache warm duration was set to empty string

          NIFI-3356: Fixed NPE

          NIFI-3356: Added debug-level performance monitoring

          NIFI-3356: Updates to unit tests that failed after rebasing against master

          NIFI-3356: Incorporated PR review feedback

          NIFI-3356: Fixed bug where we would delete index directories that are still in use; also added additional debug logging and a simple util class that can be used to textualize provenance event files - useful in debugging

          This closes #1493

          Show
          jira-bot ASF subversion and git services added a comment - Commit 96ed405d708894ee5400ebbdbf335325219faa09 in nifi's branch refs/heads/master from Mark Payne [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=96ed405 ] NIFI-3356 : Initial implementation of writeahead provenance repository The idea behind NIFI-3356 was to improve the efficiency and throughput of the Provenance Repository, as it is often the bottleneck. While testing the newly designed repository, a handful of other, fairly minor, changes were made to improve efficiency as well, as these came to light when testing the new repository: Use a BufferedOutputStream within StandardProcessSession (via a ClaimCache abstraction) in order to avoid continually writing to FileOutputStream when writing many small FlowFiles Updated threading model of MinimalLockingWriteAheadLog - now performs serialization outside of lock and writes to a 'synchronized' OutputStream Change minimum scheduling period for components from 30 microseconds to 1 nanosecond. ScheduledExecutor is very inconsistent with timing of task scheduling. With the bored.yield.duration now present, this value doesn't need to be set to 30 microseconds. This was originally done to avoid processors that had no work from dominating the CPU. However, now that we will yield when processors have no work, this results in slowing down processors that are able to perform work. Allow nifi.properties to specify multiple directories for FlowFile Repository If backpressure is engaged while running a batch of sessions, then stop batch processing earlier. This helps FlowFiles to move through the system much more smoothly instead of the herky-jerky queuing that we previously saw at very high rates of FlowFiles. Added NiFi PID to log message when starting nifi. This was simply an update to the log message that provides helpful information. NIFI-3356 : Fixed bug in ContentClaimWriteCache that resulted in data corruption and fixed bug in RepositoryConfiguration that threw exception if cache warm duration was set to empty string NIFI-3356 : Fixed NPE NIFI-3356 : Added debug-level performance monitoring NIFI-3356 : Updates to unit tests that failed after rebasing against master NIFI-3356 : Incorporated PR review feedback NIFI-3356 : Fixed bug where we would delete index directories that are still in use; also added additional debug logging and a simple util class that can be used to textualize provenance event files - useful in debugging This closes #1493
          Hide
          jira-bot ASF subversion and git services added a comment -

          Commit 96ed405d708894ee5400ebbdbf335325219faa09 in nifi's branch refs/heads/master from Mark Payne
          [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=96ed405 ]

          NIFI-3356: Initial implementation of writeahead provenance repository

          • The idea behind NIFI-3356 was to improve the efficiency and throughput of the Provenance Repository, as it is often the bottleneck. While testing the newly designed repository,
            a handful of other, fairly minor, changes were made to improve efficiency as well, as these came to light when testing the new repository:
          • Use a BufferedOutputStream within StandardProcessSession (via a ClaimCache abstraction) in order to avoid continually writing to FileOutputStream when writing many small FlowFiles
          • Updated threading model of MinimalLockingWriteAheadLog - now performs serialization outside of lock and writes to a 'synchronized' OutputStream
          • Change minimum scheduling period for components from 30 microseconds to 1 nanosecond. ScheduledExecutor is very inconsistent with timing of task scheduling. With the bored.yield.duration
            now present, this value doesn't need to be set to 30 microseconds. This was originally done to avoid processors that had no work from dominating the CPU. However, now that we will yield
            when processors have no work, this results in slowing down processors that are able to perform work.
          • Allow nifi.properties to specify multiple directories for FlowFile Repository
          • If backpressure is engaged while running a batch of sessions, then stop batch processing earlier. This helps FlowFiles to move through the system much more smoothly instead of the
            herky-jerky queuing that we previously saw at very high rates of FlowFiles.
          • Added NiFi PID to log message when starting nifi. This was simply an update to the log message that provides helpful information.

          NIFI-3356: Fixed bug in ContentClaimWriteCache that resulted in data corruption and fixed bug in RepositoryConfiguration that threw exception if cache warm duration was set to empty string

          NIFI-3356: Fixed NPE

          NIFI-3356: Added debug-level performance monitoring

          NIFI-3356: Updates to unit tests that failed after rebasing against master

          NIFI-3356: Incorporated PR review feedback

          NIFI-3356: Fixed bug where we would delete index directories that are still in use; also added additional debug logging and a simple util class that can be used to textualize provenance event files - useful in debugging

          This closes #1493

          Show
          jira-bot ASF subversion and git services added a comment - Commit 96ed405d708894ee5400ebbdbf335325219faa09 in nifi's branch refs/heads/master from Mark Payne [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=96ed405 ] NIFI-3356 : Initial implementation of writeahead provenance repository The idea behind NIFI-3356 was to improve the efficiency and throughput of the Provenance Repository, as it is often the bottleneck. While testing the newly designed repository, a handful of other, fairly minor, changes were made to improve efficiency as well, as these came to light when testing the new repository: Use a BufferedOutputStream within StandardProcessSession (via a ClaimCache abstraction) in order to avoid continually writing to FileOutputStream when writing many small FlowFiles Updated threading model of MinimalLockingWriteAheadLog - now performs serialization outside of lock and writes to a 'synchronized' OutputStream Change minimum scheduling period for components from 30 microseconds to 1 nanosecond. ScheduledExecutor is very inconsistent with timing of task scheduling. With the bored.yield.duration now present, this value doesn't need to be set to 30 microseconds. This was originally done to avoid processors that had no work from dominating the CPU. However, now that we will yield when processors have no work, this results in slowing down processors that are able to perform work. Allow nifi.properties to specify multiple directories for FlowFile Repository If backpressure is engaged while running a batch of sessions, then stop batch processing earlier. This helps FlowFiles to move through the system much more smoothly instead of the herky-jerky queuing that we previously saw at very high rates of FlowFiles. Added NiFi PID to log message when starting nifi. This was simply an update to the log message that provides helpful information. NIFI-3356 : Fixed bug in ContentClaimWriteCache that resulted in data corruption and fixed bug in RepositoryConfiguration that threw exception if cache warm duration was set to empty string NIFI-3356 : Fixed NPE NIFI-3356 : Added debug-level performance monitoring NIFI-3356 : Updates to unit tests that failed after rebasing against master NIFI-3356 : Incorporated PR review feedback NIFI-3356 : Fixed bug where we would delete index directories that are still in use; also added additional debug logging and a simple util class that can be used to textualize provenance event files - useful in debugging This closes #1493
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user markap14 commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1493#discussion_r101399653

          — Diff: nifi-framework-api/src/main/java/org/apache/nifi/provenance/IdentifierLookup.java —
          @@ -0,0 +1,88 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.nifi.provenance;
          +
          +import java.util.Collections;
          +import java.util.HashMap;
          +import java.util.List;
          +import java.util.Map;
          +
          +/**
          + * Provides a mechanism for obtaining the identifiers of components, queues, etc.
          + */
          +public interface IdentifierLookup {
          +
          + /**
          + * @return the identifiers of components that may generate Provenance Events
          + */
          + List<String> getComponentIdentifiers();
          +
          + /**
          + * @return a list of component types that may generate Provenance Events
          + */
          + List<String> getComponentTypes();
          +
          + /**
          + *
          + * @return the identifiers of FlowFile Queues that are in the flow
          + */
          + List<String> getQueueIdentifiers();
          +
          + default Map<String, Integer> invertQueueIdentifiers()

          { + return invertList(getQueueIdentifiers()); + }

          +
          + default Map<String, Integer> invertComponentTypes()

          { + return invertList(getComponentTypes()); + }

          +
          + default Map<String, Integer> invertComponentIdentifiers()

          { + return invertList(getComponentIdentifiers()); + }

          +
          + default Map<String, Integer> invertList(final List<String> values) {
          — End diff –

          That is true. Should not be an issue, though, since these values are all expected to be unique identifiers.

          Show
          githubbot ASF GitHub Bot added a comment - Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1493#discussion_r101399653 — Diff: nifi-framework-api/src/main/java/org/apache/nifi/provenance/IdentifierLookup.java — @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.provenance; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Provides a mechanism for obtaining the identifiers of components, queues, etc. + */ +public interface IdentifierLookup { + + /** + * @return the identifiers of components that may generate Provenance Events + */ + List<String> getComponentIdentifiers(); + + /** + * @return a list of component types that may generate Provenance Events + */ + List<String> getComponentTypes(); + + /** + * + * @return the identifiers of FlowFile Queues that are in the flow + */ + List<String> getQueueIdentifiers(); + + default Map<String, Integer> invertQueueIdentifiers() { + return invertList(getQueueIdentifiers()); + } + + default Map<String, Integer> invertComponentTypes() { + return invertList(getComponentTypes()); + } + + default Map<String, Integer> invertComponentIdentifiers() { + return invertList(getComponentIdentifiers()); + } + + default Map<String, Integer> invertList(final List<String> values) { — End diff – That is true. Should not be an issue, though, since these values are all expected to be unique identifiers.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user olegz commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1493#discussion_r101130301

          — Diff: nifi-framework-api/src/main/java/org/apache/nifi/provenance/IdentifierLookup.java —
          @@ -0,0 +1,88 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.nifi.provenance;
          +
          +import java.util.Collections;
          +import java.util.HashMap;
          +import java.util.List;
          +import java.util.Map;
          +
          +/**
          + * Provides a mechanism for obtaining the identifiers of components, queues, etc.
          + */
          +public interface IdentifierLookup {
          +
          + /**
          + * @return the identifiers of components that may generate Provenance Events
          + */
          + List<String> getComponentIdentifiers();
          +
          + /**
          + * @return a list of component types that may generate Provenance Events
          + */
          + List<String> getComponentTypes();
          +
          + /**
          + *
          + * @return the identifiers of FlowFile Queues that are in the flow
          + */
          + List<String> getQueueIdentifiers();
          +
          + default Map<String, Integer> invertQueueIdentifiers()

          { + return invertList(getQueueIdentifiers()); + }

          +
          + default Map<String, Integer> invertComponentTypes()

          { + return invertList(getComponentTypes()); + }

          +
          + default Map<String, Integer> invertComponentIdentifiers()

          { + return invertList(getComponentIdentifiers()); + }

          +
          + default Map<String, Integer> invertList(final List<String> values) {
          — End diff –

          Obviously a List can have duplicate entries, so different indexes may correspond to the same value. Just wanted to make sure that this is acceptable.

          Show
          githubbot ASF GitHub Bot added a comment - Github user olegz commented on a diff in the pull request: https://github.com/apache/nifi/pull/1493#discussion_r101130301 — Diff: nifi-framework-api/src/main/java/org/apache/nifi/provenance/IdentifierLookup.java — @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.provenance; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Provides a mechanism for obtaining the identifiers of components, queues, etc. + */ +public interface IdentifierLookup { + + /** + * @return the identifiers of components that may generate Provenance Events + */ + List<String> getComponentIdentifiers(); + + /** + * @return a list of component types that may generate Provenance Events + */ + List<String> getComponentTypes(); + + /** + * + * @return the identifiers of FlowFile Queues that are in the flow + */ + List<String> getQueueIdentifiers(); + + default Map<String, Integer> invertQueueIdentifiers() { + return invertList(getQueueIdentifiers()); + } + + default Map<String, Integer> invertComponentTypes() { + return invertList(getComponentTypes()); + } + + default Map<String, Integer> invertComponentIdentifiers() { + return invertList(getComponentIdentifiers()); + } + + default Map<String, Integer> invertList(final List<String> values) { — End diff – Obviously a List can have duplicate entries, so different indexes may correspond to the same value. Just wanted to make sure that this is acceptable.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user markap14 commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1493#discussion_r101106345

          — Diff: nifi-docs/src/main/asciidoc/administration-guide.adoc —
          @@ -2074,7 +2074,25 @@ The Provenance Repository contains the information related to Data Provenance. T

          ====
          Property Description
          -
          nifi.provenance.repository.implementation The Provenance Repository implementation. The default value is org.apache.nifi.provenance.PersistentProvenanceRepository and should only be changed with caution. To store provenance events in memory instead of on disk (at the risk of data loss in the event of power/machine failure), set this property to org.apache.nifi.provenance.VolatileProvenanceRepository.
          +
          nifi.provenance.repository.implementation The Provenance Repository implementation. The default value is org.apache.nifi.provenance.PersistentProvenanceRepository.
          +Two additional repositories are available as and should only be changed with caution.
          +To store provenance events in memory instead of on disk (at the risk of data loss in the event of power/machine failure),
          +set this property to org.apache.nifi.provenance.VolatileProvenanceRepository. This leaves a configurable number of Provenance Events in the Java heap, so the number
          +of events that can be retained is very limited. It has been used essentially as a no-op repository and is not recommended.
          — End diff –

          I can agree with that.

          Show
          githubbot ASF GitHub Bot added a comment - Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1493#discussion_r101106345 — Diff: nifi-docs/src/main/asciidoc/administration-guide.adoc — @@ -2074,7 +2074,25 @@ The Provenance Repository contains the information related to Data Provenance. T ==== Property Description - nifi.provenance.repository.implementation The Provenance Repository implementation. The default value is org.apache.nifi.provenance.PersistentProvenanceRepository and should only be changed with caution. To store provenance events in memory instead of on disk (at the risk of data loss in the event of power/machine failure), set this property to org.apache.nifi.provenance.VolatileProvenanceRepository. + nifi.provenance.repository.implementation The Provenance Repository implementation. The default value is org.apache.nifi.provenance.PersistentProvenanceRepository. +Two additional repositories are available as and should only be changed with caution. +To store provenance events in memory instead of on disk (at the risk of data loss in the event of power/machine failure), +set this property to org.apache.nifi.provenance.VolatileProvenanceRepository. This leaves a configurable number of Provenance Events in the Java heap, so the number +of events that can be retained is very limited. It has been used essentially as a no-op repository and is not recommended. — End diff – I can agree with that.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user markap14 commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1493#discussion_r101104283

          — Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimWriteCache.java —
          @@ -0,0 +1,168 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.nifi.controller.repository.claim;
          +
          +import java.io.BufferedOutputStream;
          +import java.io.IOException;
          +import java.io.OutputStream;
          +import java.util.HashMap;
          +import java.util.LinkedList;
          +import java.util.Map;
          +import java.util.Queue;
          +
          +import org.apache.nifi.controller.repository.ContentRepository;
          +import org.apache.nifi.stream.io.ByteCountingOutputStream;
          +
          +public class ContentClaimWriteCache {
          + private final ContentRepository contentRepo;
          + private final Map<ResourceClaim, ByteCountingOutputStream> streamMap = new HashMap<>();
          + private final Queue<ContentClaim> queue = new LinkedList<>();
          + private final int bufferSize;
          +
          + public ContentClaimWriteCache(final ContentRepository contentRepo)

          { + this(contentRepo, 8192); + }

          +
          + public ContentClaimWriteCache(final ContentRepository contentRepo, final int bufferSize)

          { + this.contentRepo = contentRepo; + this.bufferSize = bufferSize; + }

          +
          + public void reset() throws IOException {
          + try

          { + forEachStream(OutputStream::close); + }

          finally

          { + streamMap.clear(); + queue.clear(); + }

          + }
          +
          + public ContentClaim getContentClaim() throws IOException {
          + final ContentClaim contentClaim = queue.poll();
          + if (contentClaim != null)

          { + contentRepo.incrementClaimaintCount(contentClaim); + return contentClaim; + }

          +
          + final ContentClaim claim = contentRepo.create(false);
          + registerStream(claim);
          + return claim;
          + }
          +
          + private ByteCountingOutputStream registerStream(final ContentClaim contentClaim) throws IOException {
          — End diff –

          Another good catch - we can get rid of the ByteCountingOutputStream. Must have done some refactoring so that I didn't need it, and then left it. Will address.

          Show
          githubbot ASF GitHub Bot added a comment - Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1493#discussion_r101104283 — Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimWriteCache.java — @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.repository.claim; + +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.Queue; + +import org.apache.nifi.controller.repository.ContentRepository; +import org.apache.nifi.stream.io.ByteCountingOutputStream; + +public class ContentClaimWriteCache { + private final ContentRepository contentRepo; + private final Map<ResourceClaim, ByteCountingOutputStream> streamMap = new HashMap<>(); + private final Queue<ContentClaim> queue = new LinkedList<>(); + private final int bufferSize; + + public ContentClaimWriteCache(final ContentRepository contentRepo) { + this(contentRepo, 8192); + } + + public ContentClaimWriteCache(final ContentRepository contentRepo, final int bufferSize) { + this.contentRepo = contentRepo; + this.bufferSize = bufferSize; + } + + public void reset() throws IOException { + try { + forEachStream(OutputStream::close); + } finally { + streamMap.clear(); + queue.clear(); + } + } + + public ContentClaim getContentClaim() throws IOException { + final ContentClaim contentClaim = queue.poll(); + if (contentClaim != null) { + contentRepo.incrementClaimaintCount(contentClaim); + return contentClaim; + } + + final ContentClaim claim = contentRepo.create(false); + registerStream(claim); + return claim; + } + + private ByteCountingOutputStream registerStream(final ContentClaim contentClaim) throws IOException { — End diff – Another good catch - we can get rid of the ByteCountingOutputStream. Must have done some refactoring so that I didn't need it, and then left it. Will address.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user markap14 commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1493#discussion_r101100242

          — Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimWriteCache.java —
          @@ -0,0 +1,168 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.nifi.controller.repository.claim;
          +
          +import java.io.BufferedOutputStream;
          +import java.io.IOException;
          +import java.io.OutputStream;
          +import java.util.HashMap;
          +import java.util.LinkedList;
          +import java.util.Map;
          +import java.util.Queue;
          +
          +import org.apache.nifi.controller.repository.ContentRepository;
          +import org.apache.nifi.stream.io.ByteCountingOutputStream;
          +
          +public class ContentClaimWriteCache {
          + private final ContentRepository contentRepo;
          + private final Map<ResourceClaim, ByteCountingOutputStream> streamMap = new HashMap<>();
          + private final Queue<ContentClaim> queue = new LinkedList<>();
          + private final int bufferSize;
          +
          + public ContentClaimWriteCache(final ContentRepository contentRepo)

          { + this(contentRepo, 8192); + }

          +
          + public ContentClaimWriteCache(final ContentRepository contentRepo, final int bufferSize)

          { + this.contentRepo = contentRepo; + this.bufferSize = bufferSize; + }

          +
          + public void reset() throws IOException {
          + try

          { + forEachStream(OutputStream::close); + }

          finally

          { + streamMap.clear(); + queue.clear(); + }

          + }
          +
          + public ContentClaim getContentClaim() throws IOException {
          + final ContentClaim contentClaim = queue.poll();
          + if (contentClaim != null)

          { + contentRepo.incrementClaimaintCount(contentClaim); + return contentClaim; + }

          +
          + final ContentClaim claim = contentRepo.create(false);
          + registerStream(claim);
          + return claim;
          + }
          +
          + private ByteCountingOutputStream registerStream(final ContentClaim contentClaim) throws IOException

          { + final OutputStream out = contentRepo.write(contentClaim); + final OutputStream buffered = new BufferedOutputStream(out, bufferSize); + final ByteCountingOutputStream bcos = new ByteCountingOutputStream(buffered); + streamMap.put(contentClaim.getResourceClaim(), bcos); + return bcos; + }

          +
          + public OutputStream write(final ContentClaim claim) throws IOException {
          + OutputStream out = streamMap.get(claim.getResourceClaim());
          + if (out == null)

          { + out = registerStream(claim); + }

          +
          + if (!(claim instanceof StandardContentClaim))

          { + // we know that we will only create Content Claims that are of type StandardContentClaim, so if we get anything + // else, just throw an Exception because it is not valid for this Repository + throw new IllegalArgumentException("Cannot write to " + claim + " because that Content Claim does belong to this Claim Cache"); + }

          +
          + final StandardContentClaim scc = (StandardContentClaim) claim;
          + final long initialLength = Math.max(0L, scc.getLength());
          +
          + final OutputStream bcos = out;
          + return new OutputStream() {
          + private long bytesWritten = 0L;
          +
          + @Override
          + public void write(final int b) throws IOException

          { + bcos.write(b); + bytesWritten++; + scc.setLength(initialLength + bytesWritten); + }

          +
          + @Override
          + public void write(byte[] b, int off, int len) throws IOException

          { + bcos.write(b, off, len); + bytesWritten += len; + scc.setLength(initialLength + bytesWritten); + }

          +
          — End diff –

          ^^ Probably not the best idea but can be refactored at a later date. Obviously not in scope for this PR.

          Show
          githubbot ASF GitHub Bot added a comment - Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1493#discussion_r101100242 — Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimWriteCache.java — @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.repository.claim; + +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.Queue; + +import org.apache.nifi.controller.repository.ContentRepository; +import org.apache.nifi.stream.io.ByteCountingOutputStream; + +public class ContentClaimWriteCache { + private final ContentRepository contentRepo; + private final Map<ResourceClaim, ByteCountingOutputStream> streamMap = new HashMap<>(); + private final Queue<ContentClaim> queue = new LinkedList<>(); + private final int bufferSize; + + public ContentClaimWriteCache(final ContentRepository contentRepo) { + this(contentRepo, 8192); + } + + public ContentClaimWriteCache(final ContentRepository contentRepo, final int bufferSize) { + this.contentRepo = contentRepo; + this.bufferSize = bufferSize; + } + + public void reset() throws IOException { + try { + forEachStream(OutputStream::close); + } finally { + streamMap.clear(); + queue.clear(); + } + } + + public ContentClaim getContentClaim() throws IOException { + final ContentClaim contentClaim = queue.poll(); + if (contentClaim != null) { + contentRepo.incrementClaimaintCount(contentClaim); + return contentClaim; + } + + final ContentClaim claim = contentRepo.create(false); + registerStream(claim); + return claim; + } + + private ByteCountingOutputStream registerStream(final ContentClaim contentClaim) throws IOException { + final OutputStream out = contentRepo.write(contentClaim); + final OutputStream buffered = new BufferedOutputStream(out, bufferSize); + final ByteCountingOutputStream bcos = new ByteCountingOutputStream(buffered); + streamMap.put(contentClaim.getResourceClaim(), bcos); + return bcos; + } + + public OutputStream write(final ContentClaim claim) throws IOException { + OutputStream out = streamMap.get(claim.getResourceClaim()); + if (out == null) { + out = registerStream(claim); + } + + if (!(claim instanceof StandardContentClaim)) { + // we know that we will only create Content Claims that are of type StandardContentClaim, so if we get anything + // else, just throw an Exception because it is not valid for this Repository + throw new IllegalArgumentException("Cannot write to " + claim + " because that Content Claim does belong to this Claim Cache"); + } + + final StandardContentClaim scc = (StandardContentClaim) claim; + final long initialLength = Math.max(0L, scc.getLength()); + + final OutputStream bcos = out; + return new OutputStream() { + private long bytesWritten = 0L; + + @Override + public void write(final int b) throws IOException { + bcos.write(b); + bytesWritten++; + scc.setLength(initialLength + bytesWritten); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + bcos.write(b, off, len); + bytesWritten += len; + scc.setLength(initialLength + bytesWritten); + } + — End diff – ^^ Probably not the best idea but can be refactored at a later date. Obviously not in scope for this PR.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user markap14 commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1493#discussion_r101100131

          — Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimWriteCache.java —
          @@ -0,0 +1,168 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.nifi.controller.repository.claim;
          +
          +import java.io.BufferedOutputStream;
          +import java.io.IOException;
          +import java.io.OutputStream;
          +import java.util.HashMap;
          +import java.util.LinkedList;
          +import java.util.Map;
          +import java.util.Queue;
          +
          +import org.apache.nifi.controller.repository.ContentRepository;
          +import org.apache.nifi.stream.io.ByteCountingOutputStream;
          +
          +public class ContentClaimWriteCache {
          + private final ContentRepository contentRepo;
          + private final Map<ResourceClaim, ByteCountingOutputStream> streamMap = new HashMap<>();
          + private final Queue<ContentClaim> queue = new LinkedList<>();
          + private final int bufferSize;
          +
          + public ContentClaimWriteCache(final ContentRepository contentRepo)

          { + this(contentRepo, 8192); + }

          +
          + public ContentClaimWriteCache(final ContentRepository contentRepo, final int bufferSize)

          { + this.contentRepo = contentRepo; + this.bufferSize = bufferSize; + }

          +
          + public void reset() throws IOException {
          + try

          { + forEachStream(OutputStream::close); + }

          finally

          { + streamMap.clear(); + queue.clear(); + }

          + }
          +
          + public ContentClaim getContentClaim() throws IOException {
          + final ContentClaim contentClaim = queue.poll();
          + if (contentClaim != null)

          { + contentRepo.incrementClaimaintCount(contentClaim); + return contentClaim; + }

          +
          + final ContentClaim claim = contentRepo.create(false);
          + registerStream(claim);
          + return claim;
          + }
          +
          + private ByteCountingOutputStream registerStream(final ContentClaim contentClaim) throws IOException

          { + final OutputStream out = contentRepo.write(contentClaim); + final OutputStream buffered = new BufferedOutputStream(out, bufferSize); + final ByteCountingOutputStream bcos = new ByteCountingOutputStream(buffered); + streamMap.put(contentClaim.getResourceClaim(), bcos); + return bcos; + }

          +
          + public OutputStream write(final ContentClaim claim) throws IOException {
          + OutputStream out = streamMap.get(claim.getResourceClaim());
          + if (out == null)

          { + out = registerStream(claim); + }

          +
          + if (!(claim instanceof StandardContentClaim))

          { + // we know that we will only create Content Claims that are of type StandardContentClaim, so if we get anything + // else, just throw an Exception because it is not valid for this Repository + throw new IllegalArgumentException("Cannot write to " + claim + " because that Content Claim does belong to this Claim Cache"); + }

          +
          + final StandardContentClaim scc = (StandardContentClaim) claim;
          + final long initialLength = Math.max(0L, scc.getLength());
          +
          + final OutputStream bcos = out;
          + return new OutputStream() {
          + private long bytesWritten = 0L;
          +
          + @Override
          + public void write(final int b) throws IOException

          { + bcos.write(b); + bytesWritten++; + scc.setLength(initialLength + bytesWritten); + }

          +
          + @Override
          + public void write(byte[] b, int off, int len) throws IOException

          { + bcos.write(b, off, len); + bytesWritten += len; + scc.setLength(initialLength + bytesWritten); + }

          +
          — End diff –

          It is -1 I believe when created, in order to distinguish between having never been written to, and having had 0 bytes written to it

          Show
          githubbot ASF GitHub Bot added a comment - Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1493#discussion_r101100131 — Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimWriteCache.java — @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.repository.claim; + +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.Queue; + +import org.apache.nifi.controller.repository.ContentRepository; +import org.apache.nifi.stream.io.ByteCountingOutputStream; + +public class ContentClaimWriteCache { + private final ContentRepository contentRepo; + private final Map<ResourceClaim, ByteCountingOutputStream> streamMap = new HashMap<>(); + private final Queue<ContentClaim> queue = new LinkedList<>(); + private final int bufferSize; + + public ContentClaimWriteCache(final ContentRepository contentRepo) { + this(contentRepo, 8192); + } + + public ContentClaimWriteCache(final ContentRepository contentRepo, final int bufferSize) { + this.contentRepo = contentRepo; + this.bufferSize = bufferSize; + } + + public void reset() throws IOException { + try { + forEachStream(OutputStream::close); + } finally { + streamMap.clear(); + queue.clear(); + } + } + + public ContentClaim getContentClaim() throws IOException { + final ContentClaim contentClaim = queue.poll(); + if (contentClaim != null) { + contentRepo.incrementClaimaintCount(contentClaim); + return contentClaim; + } + + final ContentClaim claim = contentRepo.create(false); + registerStream(claim); + return claim; + } + + private ByteCountingOutputStream registerStream(final ContentClaim contentClaim) throws IOException { + final OutputStream out = contentRepo.write(contentClaim); + final OutputStream buffered = new BufferedOutputStream(out, bufferSize); + final ByteCountingOutputStream bcos = new ByteCountingOutputStream(buffered); + streamMap.put(contentClaim.getResourceClaim(), bcos); + return bcos; + } + + public OutputStream write(final ContentClaim claim) throws IOException { + OutputStream out = streamMap.get(claim.getResourceClaim()); + if (out == null) { + out = registerStream(claim); + } + + if (!(claim instanceof StandardContentClaim)) { + // we know that we will only create Content Claims that are of type StandardContentClaim, so if we get anything + // else, just throw an Exception because it is not valid for this Repository + throw new IllegalArgumentException("Cannot write to " + claim + " because that Content Claim does belong to this Claim Cache"); + } + + final StandardContentClaim scc = (StandardContentClaim) claim; + final long initialLength = Math.max(0L, scc.getLength()); + + final OutputStream bcos = out; + return new OutputStream() { + private long bytesWritten = 0L; + + @Override + public void write(final int b) throws IOException { + bcos.write(b); + bytesWritten++; + scc.setLength(initialLength + bytesWritten); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + bcos.write(b, off, len); + bytesWritten += len; + scc.setLength(initialLength + bytesWritten); + } + — End diff – It is -1 I believe when created, in order to distinguish between having never been written to, and having had 0 bytes written to it
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user olegz commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1493#discussion_r101099586

          — Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimWriteCache.java —
          @@ -0,0 +1,168 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.nifi.controller.repository.claim;
          +
          +import java.io.BufferedOutputStream;
          +import java.io.IOException;
          +import java.io.OutputStream;
          +import java.util.HashMap;
          +import java.util.LinkedList;
          +import java.util.Map;
          +import java.util.Queue;
          +
          +import org.apache.nifi.controller.repository.ContentRepository;
          +import org.apache.nifi.stream.io.ByteCountingOutputStream;
          +
          +public class ContentClaimWriteCache {
          + private final ContentRepository contentRepo;
          + private final Map<ResourceClaim, ByteCountingOutputStream> streamMap = new HashMap<>();
          + private final Queue<ContentClaim> queue = new LinkedList<>();
          + private final int bufferSize;
          +
          + public ContentClaimWriteCache(final ContentRepository contentRepo)

          { + this(contentRepo, 8192); + }

          +
          + public ContentClaimWriteCache(final ContentRepository contentRepo, final int bufferSize)

          { + this.contentRepo = contentRepo; + this.bufferSize = bufferSize; + }

          +
          + public void reset() throws IOException {
          + try

          { + forEachStream(OutputStream::close); + }

          finally

          { + streamMap.clear(); + queue.clear(); + }

          + }
          +
          + public ContentClaim getContentClaim() throws IOException {
          + final ContentClaim contentClaim = queue.poll();
          + if (contentClaim != null)

          { + contentRepo.incrementClaimaintCount(contentClaim); + return contentClaim; + }

          +
          + final ContentClaim claim = contentRepo.create(false);
          + registerStream(claim);
          + return claim;
          + }
          +
          + private ByteCountingOutputStream registerStream(final ContentClaim contentClaim) throws IOException

          { + final OutputStream out = contentRepo.write(contentClaim); + final OutputStream buffered = new BufferedOutputStream(out, bufferSize); + final ByteCountingOutputStream bcos = new ByteCountingOutputStream(buffered); + streamMap.put(contentClaim.getResourceClaim(), bcos); + return bcos; + }

          +
          + public OutputStream write(final ContentClaim claim) throws IOException {
          + OutputStream out = streamMap.get(claim.getResourceClaim());
          + if (out == null)

          { + out = registerStream(claim); + }

          +
          + if (!(claim instanceof StandardContentClaim))

          { + // we know that we will only create Content Claims that are of type StandardContentClaim, so if we get anything + // else, just throw an Exception because it is not valid for this Repository + throw new IllegalArgumentException("Cannot write to " + claim + " because that Content Claim does belong to this Claim Cache"); + }

          +
          + final StandardContentClaim scc = (StandardContentClaim) claim;
          + final long initialLength = Math.max(0L, scc.getLength());
          +
          + final OutputStream bcos = out;
          + return new OutputStream() {
          + private long bytesWritten = 0L;
          +
          + @Override
          + public void write(final int b) throws IOException

          { + bcos.write(b); + bytesWritten++; + scc.setLength(initialLength + bytesWritten); + }

          +
          + @Override
          + public void write(byte[] b, int off, int len) throws IOException

          { + bcos.write(b, off, len); + bytesWritten += len; + scc.setLength(initialLength + bytesWritten); + }

          +
          — End diff –

          It is correct once I make this change
          ```
          private long bytesWritten = Math.max(0L, scc.getLength());
          ```
          And there is an existing test to prove it. Curious though why would scc.length ever be < 0?

          Show
          githubbot ASF GitHub Bot added a comment - Github user olegz commented on a diff in the pull request: https://github.com/apache/nifi/pull/1493#discussion_r101099586 — Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimWriteCache.java — @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.repository.claim; + +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.Queue; + +import org.apache.nifi.controller.repository.ContentRepository; +import org.apache.nifi.stream.io.ByteCountingOutputStream; + +public class ContentClaimWriteCache { + private final ContentRepository contentRepo; + private final Map<ResourceClaim, ByteCountingOutputStream> streamMap = new HashMap<>(); + private final Queue<ContentClaim> queue = new LinkedList<>(); + private final int bufferSize; + + public ContentClaimWriteCache(final ContentRepository contentRepo) { + this(contentRepo, 8192); + } + + public ContentClaimWriteCache(final ContentRepository contentRepo, final int bufferSize) { + this.contentRepo = contentRepo; + this.bufferSize = bufferSize; + } + + public void reset() throws IOException { + try { + forEachStream(OutputStream::close); + } finally { + streamMap.clear(); + queue.clear(); + } + } + + public ContentClaim getContentClaim() throws IOException { + final ContentClaim contentClaim = queue.poll(); + if (contentClaim != null) { + contentRepo.incrementClaimaintCount(contentClaim); + return contentClaim; + } + + final ContentClaim claim = contentRepo.create(false); + registerStream(claim); + return claim; + } + + private ByteCountingOutputStream registerStream(final ContentClaim contentClaim) throws IOException { + final OutputStream out = contentRepo.write(contentClaim); + final OutputStream buffered = new BufferedOutputStream(out, bufferSize); + final ByteCountingOutputStream bcos = new ByteCountingOutputStream(buffered); + streamMap.put(contentClaim.getResourceClaim(), bcos); + return bcos; + } + + public OutputStream write(final ContentClaim claim) throws IOException { + OutputStream out = streamMap.get(claim.getResourceClaim()); + if (out == null) { + out = registerStream(claim); + } + + if (!(claim instanceof StandardContentClaim)) { + // we know that we will only create Content Claims that are of type StandardContentClaim, so if we get anything + // else, just throw an Exception because it is not valid for this Repository + throw new IllegalArgumentException("Cannot write to " + claim + " because that Content Claim does belong to this Claim Cache"); + } + + final StandardContentClaim scc = (StandardContentClaim) claim; + final long initialLength = Math.max(0L, scc.getLength()); + + final OutputStream bcos = out; + return new OutputStream() { + private long bytesWritten = 0L; + + @Override + public void write(final int b) throws IOException { + bcos.write(b); + bytesWritten++; + scc.setLength(initialLength + bytesWritten); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + bcos.write(b, off, len); + bytesWritten += len; + scc.setLength(initialLength + bytesWritten); + } + — End diff – It is correct once I make this change ``` private long bytesWritten = Math.max(0L, scc.getLength()); ``` And there is an existing test to prove it. Curious though why would scc.length ever be < 0?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user markap14 commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1493#discussion_r101098640

          — Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimWriteCache.java —
          @@ -0,0 +1,168 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.nifi.controller.repository.claim;
          +
          +import java.io.BufferedOutputStream;
          +import java.io.IOException;
          +import java.io.OutputStream;
          +import java.util.HashMap;
          +import java.util.LinkedList;
          +import java.util.Map;
          +import java.util.Queue;
          +
          +import org.apache.nifi.controller.repository.ContentRepository;
          +import org.apache.nifi.stream.io.ByteCountingOutputStream;
          +
          +public class ContentClaimWriteCache {
          + private final ContentRepository contentRepo;
          + private final Map<ResourceClaim, ByteCountingOutputStream> streamMap = new HashMap<>();
          + private final Queue<ContentClaim> queue = new LinkedList<>();
          + private final int bufferSize;
          +
          + public ContentClaimWriteCache(final ContentRepository contentRepo)

          { + this(contentRepo, 8192); + }

          +
          + public ContentClaimWriteCache(final ContentRepository contentRepo, final int bufferSize)

          { + this.contentRepo = contentRepo; + this.bufferSize = bufferSize; + }

          +
          + public void reset() throws IOException {
          + try

          { + forEachStream(OutputStream::close); + }

          finally

          { + streamMap.clear(); + queue.clear(); + }

          + }
          +
          + public ContentClaim getContentClaim() throws IOException {
          + final ContentClaim contentClaim = queue.poll();
          + if (contentClaim != null)

          { + contentRepo.incrementClaimaintCount(contentClaim); + return contentClaim; + }

          +
          + final ContentClaim claim = contentRepo.create(false);
          + registerStream(claim);
          + return claim;
          + }
          +
          + private ByteCountingOutputStream registerStream(final ContentClaim contentClaim) throws IOException

          { + final OutputStream out = contentRepo.write(contentClaim); + final OutputStream buffered = new BufferedOutputStream(out, bufferSize); + final ByteCountingOutputStream bcos = new ByteCountingOutputStream(buffered); + streamMap.put(contentClaim.getResourceClaim(), bcos); + return bcos; + }

          +
          + public OutputStream write(final ContentClaim claim) throws IOException {
          + OutputStream out = streamMap.get(claim.getResourceClaim());
          + if (out == null)

          { + out = registerStream(claim); + }

          +
          + if (!(claim instanceof StandardContentClaim))

          { + // we know that we will only create Content Claims that are of type StandardContentClaim, so if we get anything + // else, just throw an Exception because it is not valid for this Repository + throw new IllegalArgumentException("Cannot write to " + claim + " because that Content Claim does belong to this Claim Cache"); + }

          +
          + final StandardContentClaim scc = (StandardContentClaim) claim;
          + final long initialLength = Math.max(0L, scc.getLength());
          +
          + final OutputStream bcos = out;
          + return new OutputStream() {
          + private long bytesWritten = 0L;
          +
          + @Override
          + public void write(final int b) throws IOException

          { + bcos.write(b); + bytesWritten++; + scc.setLength(initialLength + bytesWritten); + }

          +
          + @Override
          + public void write(byte[] b, int off, int len) throws IOException

          { + bcos.write(b, off, len); + bytesWritten += len; + scc.setLength(initialLength + bytesWritten); + }

          +
          — End diff –

          Arguably, yes. But I don't believe it is correct I think we'd need scc.setLength(++bytesWritten + initialLength) - and that just feels a little odd. Similarly with the write(byte[], int, int) we'd need to also include initialLength there, and I think the expression becomes complex enough that keeping the lines separate is better.

          Show
          githubbot ASF GitHub Bot added a comment - Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1493#discussion_r101098640 — Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimWriteCache.java — @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.repository.claim; + +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.Queue; + +import org.apache.nifi.controller.repository.ContentRepository; +import org.apache.nifi.stream.io.ByteCountingOutputStream; + +public class ContentClaimWriteCache { + private final ContentRepository contentRepo; + private final Map<ResourceClaim, ByteCountingOutputStream> streamMap = new HashMap<>(); + private final Queue<ContentClaim> queue = new LinkedList<>(); + private final int bufferSize; + + public ContentClaimWriteCache(final ContentRepository contentRepo) { + this(contentRepo, 8192); + } + + public ContentClaimWriteCache(final ContentRepository contentRepo, final int bufferSize) { + this.contentRepo = contentRepo; + this.bufferSize = bufferSize; + } + + public void reset() throws IOException { + try { + forEachStream(OutputStream::close); + } finally { + streamMap.clear(); + queue.clear(); + } + } + + public ContentClaim getContentClaim() throws IOException { + final ContentClaim contentClaim = queue.poll(); + if (contentClaim != null) { + contentRepo.incrementClaimaintCount(contentClaim); + return contentClaim; + } + + final ContentClaim claim = contentRepo.create(false); + registerStream(claim); + return claim; + } + + private ByteCountingOutputStream registerStream(final ContentClaim contentClaim) throws IOException { + final OutputStream out = contentRepo.write(contentClaim); + final OutputStream buffered = new BufferedOutputStream(out, bufferSize); + final ByteCountingOutputStream bcos = new ByteCountingOutputStream(buffered); + streamMap.put(contentClaim.getResourceClaim(), bcos); + return bcos; + } + + public OutputStream write(final ContentClaim claim) throws IOException { + OutputStream out = streamMap.get(claim.getResourceClaim()); + if (out == null) { + out = registerStream(claim); + } + + if (!(claim instanceof StandardContentClaim)) { + // we know that we will only create Content Claims that are of type StandardContentClaim, so if we get anything + // else, just throw an Exception because it is not valid for this Repository + throw new IllegalArgumentException("Cannot write to " + claim + " because that Content Claim does belong to this Claim Cache"); + } + + final StandardContentClaim scc = (StandardContentClaim) claim; + final long initialLength = Math.max(0L, scc.getLength()); + + final OutputStream bcos = out; + return new OutputStream() { + private long bytesWritten = 0L; + + @Override + public void write(final int b) throws IOException { + bcos.write(b); + bytesWritten++; + scc.setLength(initialLength + bytesWritten); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + bcos.write(b, off, len); + bytesWritten += len; + scc.setLength(initialLength + bytesWritten); + } + — End diff – Arguably, yes. But I don't believe it is correct I think we'd need scc.setLength(++bytesWritten + initialLength) - and that just feels a little odd. Similarly with the write(byte[], int, int) we'd need to also include initialLength there, and I think the expression becomes complex enough that keeping the lines separate is better.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user markap14 commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1493#discussion_r101097703

          — Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimWriteCache.java —
          @@ -0,0 +1,168 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.nifi.controller.repository.claim;
          +
          +import java.io.BufferedOutputStream;
          +import java.io.IOException;
          +import java.io.OutputStream;
          +import java.util.HashMap;
          +import java.util.LinkedList;
          +import java.util.Map;
          +import java.util.Queue;
          +
          +import org.apache.nifi.controller.repository.ContentRepository;
          +import org.apache.nifi.stream.io.ByteCountingOutputStream;
          +
          +public class ContentClaimWriteCache {
          + private final ContentRepository contentRepo;
          + private final Map<ResourceClaim, ByteCountingOutputStream> streamMap = new HashMap<>();
          + private final Queue<ContentClaim> queue = new LinkedList<>();
          + private final int bufferSize;
          +
          + public ContentClaimWriteCache(final ContentRepository contentRepo)

          { + this(contentRepo, 8192); + }

          +
          + public ContentClaimWriteCache(final ContentRepository contentRepo, final int bufferSize)

          { + this.contentRepo = contentRepo; + this.bufferSize = bufferSize; + }

          +
          + public void reset() throws IOException {
          + try

          { + forEachStream(OutputStream::close); + }

          finally

          { + streamMap.clear(); + queue.clear(); + }

          + }
          +
          + public ContentClaim getContentClaim() throws IOException {
          + final ContentClaim contentClaim = queue.poll();
          + if (contentClaim != null)

          { + contentRepo.incrementClaimaintCount(contentClaim); + return contentClaim; + }

          +
          + final ContentClaim claim = contentRepo.create(false);
          + registerStream(claim);
          + return claim;
          + }
          +
          + private ByteCountingOutputStream registerStream(final ContentClaim contentClaim) throws IOException

          { + final OutputStream out = contentRepo.write(contentClaim); + final OutputStream buffered = new BufferedOutputStream(out, bufferSize); + final ByteCountingOutputStream bcos = new ByteCountingOutputStream(buffered); + streamMap.put(contentClaim.getResourceClaim(), bcos); + return bcos; + }

          +
          + public OutputStream write(final ContentClaim claim) throws IOException {
          — End diff –

          I called it write() to mimic the naming using in ContentRepository, as this made the transition of StandardProcessSession a little easier to comprehend. At least that was my intention

          Show
          githubbot ASF GitHub Bot added a comment - Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1493#discussion_r101097703 — Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimWriteCache.java — @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.repository.claim; + +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.Queue; + +import org.apache.nifi.controller.repository.ContentRepository; +import org.apache.nifi.stream.io.ByteCountingOutputStream; + +public class ContentClaimWriteCache { + private final ContentRepository contentRepo; + private final Map<ResourceClaim, ByteCountingOutputStream> streamMap = new HashMap<>(); + private final Queue<ContentClaim> queue = new LinkedList<>(); + private final int bufferSize; + + public ContentClaimWriteCache(final ContentRepository contentRepo) { + this(contentRepo, 8192); + } + + public ContentClaimWriteCache(final ContentRepository contentRepo, final int bufferSize) { + this.contentRepo = contentRepo; + this.bufferSize = bufferSize; + } + + public void reset() throws IOException { + try { + forEachStream(OutputStream::close); + } finally { + streamMap.clear(); + queue.clear(); + } + } + + public ContentClaim getContentClaim() throws IOException { + final ContentClaim contentClaim = queue.poll(); + if (contentClaim != null) { + contentRepo.incrementClaimaintCount(contentClaim); + return contentClaim; + } + + final ContentClaim claim = contentRepo.create(false); + registerStream(claim); + return claim; + } + + private ByteCountingOutputStream registerStream(final ContentClaim contentClaim) throws IOException { + final OutputStream out = contentRepo.write(contentClaim); + final OutputStream buffered = new BufferedOutputStream(out, bufferSize); + final ByteCountingOutputStream bcos = new ByteCountingOutputStream(buffered); + streamMap.put(contentClaim.getResourceClaim(), bcos); + return bcos; + } + + public OutputStream write(final ContentClaim claim) throws IOException { — End diff – I called it write() to mimic the naming using in ContentRepository, as this made the transition of StandardProcessSession a little easier to comprehend. At least that was my intention
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user olegz commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1493#discussion_r101097614

          — Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimWriteCache.java —
          @@ -0,0 +1,168 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.nifi.controller.repository.claim;
          +
          +import java.io.BufferedOutputStream;
          +import java.io.IOException;
          +import java.io.OutputStream;
          +import java.util.HashMap;
          +import java.util.LinkedList;
          +import java.util.Map;
          +import java.util.Queue;
          +
          +import org.apache.nifi.controller.repository.ContentRepository;
          +import org.apache.nifi.stream.io.ByteCountingOutputStream;
          +
          +public class ContentClaimWriteCache {
          + private final ContentRepository contentRepo;
          + private final Map<ResourceClaim, ByteCountingOutputStream> streamMap = new HashMap<>();
          + private final Queue<ContentClaim> queue = new LinkedList<>();
          + private final int bufferSize;
          +
          + public ContentClaimWriteCache(final ContentRepository contentRepo)

          { + this(contentRepo, 8192); + }

          +
          + public ContentClaimWriteCache(final ContentRepository contentRepo, final int bufferSize)

          { + this.contentRepo = contentRepo; + this.bufferSize = bufferSize; + }

          +
          + public void reset() throws IOException {
          + try

          { + forEachStream(OutputStream::close); + }

          finally

          { + streamMap.clear(); + queue.clear(); + }

          + }
          +
          + public ContentClaim getContentClaim() throws IOException {
          + final ContentClaim contentClaim = queue.poll();
          + if (contentClaim != null)

          { + contentRepo.incrementClaimaintCount(contentClaim); + return contentClaim; + }

          +
          + final ContentClaim claim = contentRepo.create(false);
          + registerStream(claim);
          + return claim;
          + }
          +
          + private ByteCountingOutputStream registerStream(final ContentClaim contentClaim) throws IOException

          { + final OutputStream out = contentRepo.write(contentClaim); + final OutputStream buffered = new BufferedOutputStream(out, bufferSize); + final ByteCountingOutputStream bcos = new ByteCountingOutputStream(buffered); + streamMap.put(contentClaim.getResourceClaim(), bcos); + return bcos; + }

          +
          + public OutputStream write(final ContentClaim claim) throws IOException {
          + OutputStream out = streamMap.get(claim.getResourceClaim());
          + if (out == null)

          { + out = registerStream(claim); + }

          +
          + if (!(claim instanceof StandardContentClaim))

          { + // we know that we will only create Content Claims that are of type StandardContentClaim, so if we get anything + // else, just throw an Exception because it is not valid for this Repository + throw new IllegalArgumentException("Cannot write to " + claim + " because that Content Claim does belong to this Claim Cache"); + }

          +
          + final StandardContentClaim scc = (StandardContentClaim) claim;
          + final long initialLength = Math.max(0L, scc.getLength());
          +
          + final OutputStream bcos = out;
          + return new OutputStream() {
          + private long bytesWritten = 0L;
          +
          + @Override
          + public void write(final int b) throws IOException

          { + bcos.write(b); + bytesWritten++; + scc.setLength(initialLength + bytesWritten); + }

          +
          + @Override
          + public void write(byte[] b, int off, int len) throws IOException

          { + bcos.write(b, off, len); + bytesWritten += len; + scc.setLength(initialLength + bytesWritten); + }

          +
          — End diff –

          Wouldn't this be a bit simpler?
          ```
          return new OutputStream() {
          private long bytesWritten = scc.getLength();

          @Override
          public void write(final int b) throws IOException

          { bcos.write(b); scc.setLength(bytesWritten++); }

          @Override
          public void write(byte[] b, int off, int len) throws IOException

          { bcos.write(b, off, len); scc.setLength(bytesWritten += len); }

          . . .
          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user olegz commented on a diff in the pull request: https://github.com/apache/nifi/pull/1493#discussion_r101097614 — Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimWriteCache.java — @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.repository.claim; + +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.Queue; + +import org.apache.nifi.controller.repository.ContentRepository; +import org.apache.nifi.stream.io.ByteCountingOutputStream; + +public class ContentClaimWriteCache { + private final ContentRepository contentRepo; + private final Map<ResourceClaim, ByteCountingOutputStream> streamMap = new HashMap<>(); + private final Queue<ContentClaim> queue = new LinkedList<>(); + private final int bufferSize; + + public ContentClaimWriteCache(final ContentRepository contentRepo) { + this(contentRepo, 8192); + } + + public ContentClaimWriteCache(final ContentRepository contentRepo, final int bufferSize) { + this.contentRepo = contentRepo; + this.bufferSize = bufferSize; + } + + public void reset() throws IOException { + try { + forEachStream(OutputStream::close); + } finally { + streamMap.clear(); + queue.clear(); + } + } + + public ContentClaim getContentClaim() throws IOException { + final ContentClaim contentClaim = queue.poll(); + if (contentClaim != null) { + contentRepo.incrementClaimaintCount(contentClaim); + return contentClaim; + } + + final ContentClaim claim = contentRepo.create(false); + registerStream(claim); + return claim; + } + + private ByteCountingOutputStream registerStream(final ContentClaim contentClaim) throws IOException { + final OutputStream out = contentRepo.write(contentClaim); + final OutputStream buffered = new BufferedOutputStream(out, bufferSize); + final ByteCountingOutputStream bcos = new ByteCountingOutputStream(buffered); + streamMap.put(contentClaim.getResourceClaim(), bcos); + return bcos; + } + + public OutputStream write(final ContentClaim claim) throws IOException { + OutputStream out = streamMap.get(claim.getResourceClaim()); + if (out == null) { + out = registerStream(claim); + } + + if (!(claim instanceof StandardContentClaim)) { + // we know that we will only create Content Claims that are of type StandardContentClaim, so if we get anything + // else, just throw an Exception because it is not valid for this Repository + throw new IllegalArgumentException("Cannot write to " + claim + " because that Content Claim does belong to this Claim Cache"); + } + + final StandardContentClaim scc = (StandardContentClaim) claim; + final long initialLength = Math.max(0L, scc.getLength()); + + final OutputStream bcos = out; + return new OutputStream() { + private long bytesWritten = 0L; + + @Override + public void write(final int b) throws IOException { + bcos.write(b); + bytesWritten++; + scc.setLength(initialLength + bytesWritten); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + bcos.write(b, off, len); + bytesWritten += len; + scc.setLength(initialLength + bytesWritten); + } + — End diff – Wouldn't this be a bit simpler? ``` return new OutputStream() { private long bytesWritten = scc.getLength(); @Override public void write(final int b) throws IOException { bcos.write(b); scc.setLength(bytesWritten++); } @Override public void write(byte[] b, int off, int len) throws IOException { bcos.write(b, off, len); scc.setLength(bytesWritten += len); } . . . ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user markap14 commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1493#discussion_r101097507

          — Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java —
          @@ -74,6 +86,295 @@ public void clearRepo() throws IOException {
          }
          }

          +
          + @Test
          + public void testUpdatePerformance() throws IOException, InterruptedException {
          — End diff –

          That is accurate - i did intend to ignore it. Nice catch.

          Show
          githubbot ASF GitHub Bot added a comment - Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1493#discussion_r101097507 — Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java — @@ -74,6 +86,295 @@ public void clearRepo() throws IOException { } } + + @Test + public void testUpdatePerformance() throws IOException, InterruptedException { — End diff – That is accurate - i did intend to ignore it. Nice catch.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user markap14 commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1493#discussion_r101097304

          — Diff: nifi-docs/src/main/asciidoc/administration-guide.adoc —
          @@ -2074,7 +2074,25 @@ The Provenance Repository contains the information related to Data Provenance. T

          ====
          Property Description
          -
          nifi.provenance.repository.implementation The Provenance Repository implementation. The default value is org.apache.nifi.provenance.PersistentProvenanceRepository and should only be changed with caution. To store provenance events in memory instead of on disk (at the risk of data loss in the event of power/machine failure), set this property to org.apache.nifi.provenance.VolatileProvenanceRepository.
          +
          nifi.provenance.repository.implementation The Provenance Repository implementation. The default value is org.apache.nifi.provenance.PersistentProvenanceRepository.
          +Two additional repositories are available as and should only be changed with caution.
          — End diff –

          I agree - that was there previously when the only two options were Volatile and Persistent Prov Repo and the note was there to warn that you should know what you're doing when you change to Volatile. This warning can be removed now, I think, since there are two repos that provide persistent storage of the data.

          Show
          githubbot ASF GitHub Bot added a comment - Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1493#discussion_r101097304 — Diff: nifi-docs/src/main/asciidoc/administration-guide.adoc — @@ -2074,7 +2074,25 @@ The Provenance Repository contains the information related to Data Provenance. T ==== Property Description - nifi.provenance.repository.implementation The Provenance Repository implementation. The default value is org.apache.nifi.provenance.PersistentProvenanceRepository and should only be changed with caution. To store provenance events in memory instead of on disk (at the risk of data loss in the event of power/machine failure), set this property to org.apache.nifi.provenance.VolatileProvenanceRepository. + nifi.provenance.repository.implementation The Provenance Repository implementation. The default value is org.apache.nifi.provenance.PersistentProvenanceRepository. +Two additional repositories are available as and should only be changed with caution. — End diff – I agree - that was there previously when the only two options were Volatile and Persistent Prov Repo and the note was there to warn that you should know what you're doing when you change to Volatile. This warning can be removed now, I think, since there are two repos that provide persistent storage of the data.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user olegz commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1493#discussion_r101094014

          — Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimWriteCache.java —
          @@ -0,0 +1,168 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.nifi.controller.repository.claim;
          +
          +import java.io.BufferedOutputStream;
          +import java.io.IOException;
          +import java.io.OutputStream;
          +import java.util.HashMap;
          +import java.util.LinkedList;
          +import java.util.Map;
          +import java.util.Queue;
          +
          +import org.apache.nifi.controller.repository.ContentRepository;
          +import org.apache.nifi.stream.io.ByteCountingOutputStream;
          +
          +public class ContentClaimWriteCache {
          + private final ContentRepository contentRepo;
          + private final Map<ResourceClaim, ByteCountingOutputStream> streamMap = new HashMap<>();
          + private final Queue<ContentClaim> queue = new LinkedList<>();
          + private final int bufferSize;
          +
          + public ContentClaimWriteCache(final ContentRepository contentRepo)

          { + this(contentRepo, 8192); + }

          +
          + public ContentClaimWriteCache(final ContentRepository contentRepo, final int bufferSize)

          { + this.contentRepo = contentRepo; + this.bufferSize = bufferSize; + }

          +
          + public void reset() throws IOException {
          + try

          { + forEachStream(OutputStream::close); + }

          finally

          { + streamMap.clear(); + queue.clear(); + }

          + }
          +
          + public ContentClaim getContentClaim() throws IOException {
          + final ContentClaim contentClaim = queue.poll();
          + if (contentClaim != null)

          { + contentRepo.incrementClaimaintCount(contentClaim); + return contentClaim; + }

          +
          + final ContentClaim claim = contentRepo.create(false);
          + registerStream(claim);
          + return claim;
          + }
          +
          + private ByteCountingOutputStream registerStream(final ContentClaim contentClaim) throws IOException

          { + final OutputStream out = contentRepo.write(contentClaim); + final OutputStream buffered = new BufferedOutputStream(out, bufferSize); + final ByteCountingOutputStream bcos = new ByteCountingOutputStream(buffered); + streamMap.put(contentClaim.getResourceClaim(), bcos); + return bcos; + }

          +
          + public OutputStream write(final ContentClaim claim) throws IOException {
          — End diff –

          The write(..) as a name is quite confusing since there is no writing going on. I think something along the lines of obtainStream(..). . .

          Show
          githubbot ASF GitHub Bot added a comment - Github user olegz commented on a diff in the pull request: https://github.com/apache/nifi/pull/1493#discussion_r101094014 — Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimWriteCache.java — @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.repository.claim; + +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.Queue; + +import org.apache.nifi.controller.repository.ContentRepository; +import org.apache.nifi.stream.io.ByteCountingOutputStream; + +public class ContentClaimWriteCache { + private final ContentRepository contentRepo; + private final Map<ResourceClaim, ByteCountingOutputStream> streamMap = new HashMap<>(); + private final Queue<ContentClaim> queue = new LinkedList<>(); + private final int bufferSize; + + public ContentClaimWriteCache(final ContentRepository contentRepo) { + this(contentRepo, 8192); + } + + public ContentClaimWriteCache(final ContentRepository contentRepo, final int bufferSize) { + this.contentRepo = contentRepo; + this.bufferSize = bufferSize; + } + + public void reset() throws IOException { + try { + forEachStream(OutputStream::close); + } finally { + streamMap.clear(); + queue.clear(); + } + } + + public ContentClaim getContentClaim() throws IOException { + final ContentClaim contentClaim = queue.poll(); + if (contentClaim != null) { + contentRepo.incrementClaimaintCount(contentClaim); + return contentClaim; + } + + final ContentClaim claim = contentRepo.create(false); + registerStream(claim); + return claim; + } + + private ByteCountingOutputStream registerStream(final ContentClaim contentClaim) throws IOException { + final OutputStream out = contentRepo.write(contentClaim); + final OutputStream buffered = new BufferedOutputStream(out, bufferSize); + final ByteCountingOutputStream bcos = new ByteCountingOutputStream(buffered); + streamMap.put(contentClaim.getResourceClaim(), bcos); + return bcos; + } + + public OutputStream write(final ContentClaim claim) throws IOException { — End diff – The write(..) as a name is quite confusing since there is no writing going on. I think something along the lines of obtainStream(..) . . .
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user olegz commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1493#discussion_r101088999

          — Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java —
          @@ -74,6 +86,295 @@ public void clearRepo() throws IOException {
          }
          }

          +
          + @Test
          + public void testUpdatePerformance() throws IOException, InterruptedException {
          — End diff –

          I don't think you meant to leave this test as is without @Ignore. It certainly takes time to complete.

          Show
          githubbot ASF GitHub Bot added a comment - Github user olegz commented on a diff in the pull request: https://github.com/apache/nifi/pull/1493#discussion_r101088999 — Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java — @@ -74,6 +86,295 @@ public void clearRepo() throws IOException { } } + + @Test + public void testUpdatePerformance() throws IOException, InterruptedException { — End diff – I don't think you meant to leave this test as is without @Ignore. It certainly takes time to complete.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user olegz commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1493#discussion_r101084757

          — Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimWriteCache.java —
          @@ -0,0 +1,168 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.nifi.controller.repository.claim;
          +
          +import java.io.BufferedOutputStream;
          +import java.io.IOException;
          +import java.io.OutputStream;
          +import java.util.HashMap;
          +import java.util.LinkedList;
          +import java.util.Map;
          +import java.util.Queue;
          +
          +import org.apache.nifi.controller.repository.ContentRepository;
          +import org.apache.nifi.stream.io.ByteCountingOutputStream;
          +
          +public class ContentClaimWriteCache {
          + private final ContentRepository contentRepo;
          + private final Map<ResourceClaim, ByteCountingOutputStream> streamMap = new HashMap<>();
          + private final Queue<ContentClaim> queue = new LinkedList<>();
          + private final int bufferSize;
          +
          + public ContentClaimWriteCache(final ContentRepository contentRepo)

          { + this(contentRepo, 8192); + }

          +
          + public ContentClaimWriteCache(final ContentRepository contentRepo, final int bufferSize)

          { + this.contentRepo = contentRepo; + this.bufferSize = bufferSize; + }

          +
          + public void reset() throws IOException {
          + try

          { + forEachStream(OutputStream::close); + }

          finally

          { + streamMap.clear(); + queue.clear(); + }

          + }
          +
          + public ContentClaim getContentClaim() throws IOException {
          + final ContentClaim contentClaim = queue.poll();
          + if (contentClaim != null)

          { + contentRepo.incrementClaimaintCount(contentClaim); + return contentClaim; + }

          +
          + final ContentClaim claim = contentRepo.create(false);
          + registerStream(claim);
          + return claim;
          + }
          +
          + private ByteCountingOutputStream registerStream(final ContentClaim contentClaim) throws IOException {
          — End diff –

          Also, since additional functionality (i.e., counter) of ByteCountingOutputStream is used, do you even need the additional wrapper?

          Show
          githubbot ASF GitHub Bot added a comment - Github user olegz commented on a diff in the pull request: https://github.com/apache/nifi/pull/1493#discussion_r101084757 — Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimWriteCache.java — @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.repository.claim; + +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.Queue; + +import org.apache.nifi.controller.repository.ContentRepository; +import org.apache.nifi.stream.io.ByteCountingOutputStream; + +public class ContentClaimWriteCache { + private final ContentRepository contentRepo; + private final Map<ResourceClaim, ByteCountingOutputStream> streamMap = new HashMap<>(); + private final Queue<ContentClaim> queue = new LinkedList<>(); + private final int bufferSize; + + public ContentClaimWriteCache(final ContentRepository contentRepo) { + this(contentRepo, 8192); + } + + public ContentClaimWriteCache(final ContentRepository contentRepo, final int bufferSize) { + this.contentRepo = contentRepo; + this.bufferSize = bufferSize; + } + + public void reset() throws IOException { + try { + forEachStream(OutputStream::close); + } finally { + streamMap.clear(); + queue.clear(); + } + } + + public ContentClaim getContentClaim() throws IOException { + final ContentClaim contentClaim = queue.poll(); + if (contentClaim != null) { + contentRepo.incrementClaimaintCount(contentClaim); + return contentClaim; + } + + final ContentClaim claim = contentRepo.create(false); + registerStream(claim); + return claim; + } + + private ByteCountingOutputStream registerStream(final ContentClaim contentClaim) throws IOException { — End diff – Also, since additional functionality (i.e., counter) of ByteCountingOutputStream is used, do you even need the additional wrapper?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user olegz commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1493#discussion_r101084265

          — Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimWriteCache.java —
          @@ -0,0 +1,168 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.nifi.controller.repository.claim;
          +
          +import java.io.BufferedOutputStream;
          +import java.io.IOException;
          +import java.io.OutputStream;
          +import java.util.HashMap;
          +import java.util.LinkedList;
          +import java.util.Map;
          +import java.util.Queue;
          +
          +import org.apache.nifi.controller.repository.ContentRepository;
          +import org.apache.nifi.stream.io.ByteCountingOutputStream;
          +
          +public class ContentClaimWriteCache {
          + private final ContentRepository contentRepo;
          + private final Map<ResourceClaim, ByteCountingOutputStream> streamMap = new HashMap<>();
          + private final Queue<ContentClaim> queue = new LinkedList<>();
          + private final int bufferSize;
          +
          + public ContentClaimWriteCache(final ContentRepository contentRepo)

          { + this(contentRepo, 8192); + }

          +
          + public ContentClaimWriteCache(final ContentRepository contentRepo, final int bufferSize)

          { + this.contentRepo = contentRepo; + this.bufferSize = bufferSize; + }

          +
          + public void reset() throws IOException {
          + try

          { + forEachStream(OutputStream::close); + }

          finally

          { + streamMap.clear(); + queue.clear(); + }

          + }
          +
          + public ContentClaim getContentClaim() throws IOException {
          + final ContentClaim contentClaim = queue.poll();
          + if (contentClaim != null)

          { + contentRepo.incrementClaimaintCount(contentClaim); + return contentClaim; + }

          +
          + final ContentClaim claim = contentRepo.create(false);
          + registerStream(claim);
          + return claim;
          + }
          +
          + private ByteCountingOutputStream registerStream(final ContentClaim contentClaim) throws IOException {
          — End diff –

          Does it have to return ByteCountingOutputStream? It seems like everywhere it is referenced it is used as _ OutputStream_.

          Show
          githubbot ASF GitHub Bot added a comment - Github user olegz commented on a diff in the pull request: https://github.com/apache/nifi/pull/1493#discussion_r101084265 — Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimWriteCache.java — @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.repository.claim; + +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.Queue; + +import org.apache.nifi.controller.repository.ContentRepository; +import org.apache.nifi.stream.io.ByteCountingOutputStream; + +public class ContentClaimWriteCache { + private final ContentRepository contentRepo; + private final Map<ResourceClaim, ByteCountingOutputStream> streamMap = new HashMap<>(); + private final Queue<ContentClaim> queue = new LinkedList<>(); + private final int bufferSize; + + public ContentClaimWriteCache(final ContentRepository contentRepo) { + this(contentRepo, 8192); + } + + public ContentClaimWriteCache(final ContentRepository contentRepo, final int bufferSize) { + this.contentRepo = contentRepo; + this.bufferSize = bufferSize; + } + + public void reset() throws IOException { + try { + forEachStream(OutputStream::close); + } finally { + streamMap.clear(); + queue.clear(); + } + } + + public ContentClaim getContentClaim() throws IOException { + final ContentClaim contentClaim = queue.poll(); + if (contentClaim != null) { + contentRepo.incrementClaimaintCount(contentClaim); + return contentClaim; + } + + final ContentClaim claim = contentRepo.create(false); + registerStream(claim); + return claim; + } + + private ByteCountingOutputStream registerStream(final ContentClaim contentClaim) throws IOException { — End diff – Does it have to return ByteCountingOutputStream ? It seems like everywhere it is referenced it is used as _ OutputStream_.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user olegz commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1493#discussion_r101080280

          — Diff: nifi-docs/src/main/asciidoc/administration-guide.adoc —
          @@ -2074,7 +2074,25 @@ The Provenance Repository contains the information related to Data Provenance. T

          ====
          Property Description
          -
          nifi.provenance.repository.implementation The Provenance Repository implementation. The default value is org.apache.nifi.provenance.PersistentProvenanceRepository and should only be changed with caution. To store provenance events in memory instead of on disk (at the risk of data loss in the event of power/machine failure), set this property to org.apache.nifi.provenance.VolatileProvenanceRepository.
          +
          nifi.provenance.repository.implementation The Provenance Repository implementation. The default value is org.apache.nifi.provenance.PersistentProvenanceRepository.
          +Two additional repositories are available as and should only be changed with caution.
          +To store provenance events in memory instead of on disk (at the risk of data loss in the event of power/machine failure),
          +set this property to org.apache.nifi.provenance.VolatileProvenanceRepository. This leaves a configurable number of Provenance Events in the Java heap, so the number
          +of events that can be retained is very limited. It has been used essentially as a no-op repository and is not recommended.
          — End diff –

          I think all we need to say here is that VolatileProvenanceRepository stores events in memory and is configurable. However, once event buffer exceeds, events are evicted and essentially lost, and that is expected. There is no risk here, just different expectations.

          Show
          githubbot ASF GitHub Bot added a comment - Github user olegz commented on a diff in the pull request: https://github.com/apache/nifi/pull/1493#discussion_r101080280 — Diff: nifi-docs/src/main/asciidoc/administration-guide.adoc — @@ -2074,7 +2074,25 @@ The Provenance Repository contains the information related to Data Provenance. T ==== Property Description - nifi.provenance.repository.implementation The Provenance Repository implementation. The default value is org.apache.nifi.provenance.PersistentProvenanceRepository and should only be changed with caution. To store provenance events in memory instead of on disk (at the risk of data loss in the event of power/machine failure), set this property to org.apache.nifi.provenance.VolatileProvenanceRepository. + nifi.provenance.repository.implementation The Provenance Repository implementation. The default value is org.apache.nifi.provenance.PersistentProvenanceRepository. +Two additional repositories are available as and should only be changed with caution. +To store provenance events in memory instead of on disk (at the risk of data loss in the event of power/machine failure), +set this property to org.apache.nifi.provenance.VolatileProvenanceRepository. This leaves a configurable number of Provenance Events in the Java heap, so the number +of events that can be retained is very limited. It has been used essentially as a no-op repository and is not recommended. — End diff – I think all we need to say here is that VolatileProvenanceRepository stores events in memory and is configurable. However, once event buffer exceeds, events are evicted and essentially lost, and that is expected. There is no risk here, just different expectations.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user olegz commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1493#discussion_r101079356

          — Diff: nifi-docs/src/main/asciidoc/administration-guide.adoc —
          @@ -2074,7 +2074,25 @@ The Provenance Repository contains the information related to Data Provenance. T

          ====
          Property Description
          -
          nifi.provenance.repository.implementation The Provenance Repository implementation. The default value is org.apache.nifi.provenance.PersistentProvenanceRepository and should only be changed with caution. To store provenance events in memory instead of on disk (at the risk of data loss in the event of power/machine failure), set this property to org.apache.nifi.provenance.VolatileProvenanceRepository.
          +
          nifi.provenance.repository.implementation The Provenance Repository implementation. The default value is org.apache.nifi.provenance.PersistentProvenanceRepository.
          +Two additional repositories are available as and should only be changed with caution.
          — End diff –

          I am not sure 'should only be changed with caution' is necessary. It sounds like the other two are broken or may break something. In reality they don't. They just behave different.

          Show
          githubbot ASF GitHub Bot added a comment - Github user olegz commented on a diff in the pull request: https://github.com/apache/nifi/pull/1493#discussion_r101079356 — Diff: nifi-docs/src/main/asciidoc/administration-guide.adoc — @@ -2074,7 +2074,25 @@ The Provenance Repository contains the information related to Data Provenance. T ==== Property Description - nifi.provenance.repository.implementation The Provenance Repository implementation. The default value is org.apache.nifi.provenance.PersistentProvenanceRepository and should only be changed with caution. To store provenance events in memory instead of on disk (at the risk of data loss in the event of power/machine failure), set this property to org.apache.nifi.provenance.VolatileProvenanceRepository. + nifi.provenance.repository.implementation The Provenance Repository implementation. The default value is org.apache.nifi.provenance.PersistentProvenanceRepository. +Two additional repositories are available as and should only be changed with caution. — End diff – I am not sure ' should only be changed with caution ' is necessary. It sounds like the other two are broken or may break something. In reality they don't. They just behave different.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user olegz commented on the issue:

          https://github.com/apache/nifi/pull/1493

          @markap14 there seem to be some unintended files that made into the commit (i.e., /bin/.., .gitignore etc)

          Show
          githubbot ASF GitHub Bot added a comment - Github user olegz commented on the issue: https://github.com/apache/nifi/pull/1493 @markap14 there seem to be some unintended files that made into the commit (i.e., /bin/.., .gitignore etc)
          Hide
          markap14 Mark Payne added a comment -

          Joseph Witt Michael Moser I have now rebased against master, as #1475 has been merged. Thanks for getting that merged in, Michael Moser.

          Show
          markap14 Mark Payne added a comment - Joseph Witt Michael Moser I have now rebased against master, as #1475 has been merged. Thanks for getting that merged in, Michael Moser .
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user markap14 commented on the issue:

          https://github.com/apache/nifi/pull/1493

          @mosermw good call. We should be able to get #1475 merged pretty easily, I think. Then can rebase my PR.

          Show
          githubbot ASF GitHub Bot added a comment - Github user markap14 commented on the issue: https://github.com/apache/nifi/pull/1493 @mosermw good call. We should be able to get #1475 merged pretty easily, I think. Then can rebase my PR.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user joewitt commented on the issue:

          https://github.com/apache/nifi/pull/1493

          am a +1 to mike's point. @markap14 can you please review/merge if appropriate 1475. i'd be happy to dive through this once that is in place but agree we should not push this and complicate that one first.

          Show
          githubbot ASF GitHub Bot added a comment - Github user joewitt commented on the issue: https://github.com/apache/nifi/pull/1493 am a +1 to mike's point. @markap14 can you please review/merge if appropriate 1475. i'd be happy to dive through this once that is in place but agree we should not push this and complicate that one first.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mosermw commented on the issue:

          https://github.com/apache/nifi/pull/1493

          Some neat stuff here, @markap14 . I noticed you changed a couple of the same files as PR #1475. Since #1475 is almost complete, it would be nice to commit that first. Merging that into this PR shouldn't be bad at all, and if we're lucky it will still apply cleanly!

          Show
          githubbot ASF GitHub Bot added a comment - Github user mosermw commented on the issue: https://github.com/apache/nifi/pull/1493 Some neat stuff here, @markap14 . I noticed you changed a couple of the same files as PR #1475. Since #1475 is almost complete, it would be nice to commit that first. Merging that into this PR shouldn't be bad at all, and if we're lucky it will still apply cleanly!
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user markap14 opened a pull request:

          https://github.com/apache/nifi/pull/1493

          NIFI-3356: Initial implementation of writeahead provenance repository

          • The idea behind NIFI-3356 was to improve the efficiency and throughput of the Provenance Repository, as it is often the bottleneck. While testing the newly designed repository,
            a handful of other, fairly minor, changes were made to improve efficiency as well, as these came to light when testing the new repository:
          • Use a BufferedOutputStream within StandardProcessSession (via a ClaimCache abstraction) in order to avoid continually writing to FileOutputStream when writing many small FlowFiles
          • Updated threading model of MinimalLockingWriteAheadLog - now performs serialization outside of lock and writes to a 'synchronized' OutputStream
          • Change minimum scheduling period for components from 30 microseconds to 1 nanosecond. ScheduledExecutor is very inconsistent with timing of task scheduling. With the bored.yield.duration
            now present, this value doesn't need to be set to 30 microseconds. This was originally done to avoid processors that had no work from dominating the CPU. However, now that we will yield
            when processors have no work, this results in slowing down processors that are able to perform work.
          • Allow nifi.properties to specify multiple directories for FlowFile Repository
          • If backpressure is engaged while running a batch of sessions, then stop batch processing earlier. This helps FlowFiles to move through the system much more smoothly instead of the
            herky-jerky queuing that we previously saw at very high rates of FlowFiles.
          • Added NiFi PID to log message when starting nifi. This was simply an update to the log message that provides helpful information.

          Thank you for submitting a contribution to Apache NiFi.

          In order to streamline the review of the contribution we ask you
          to ensure the following steps have been taken:

              1. For all changes:
          • [ ] Is there a JIRA ticket associated with this PR? Is it referenced
            in the commit message?
          • [ ] Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
          • [ ] Has your PR been rebased against the latest commit within the target branch (typically master)?
          • [ ] Is your initial contribution a single, squashed commit?
              1. For code changes:
          • [ ] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder?
          • [ ] Have you written or updated unit tests to verify your changes?
          • [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
          • [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly?
          • [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly?
          • [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties?
              1. For documentation related changes:
          • [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
              1. Note:
                Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/markap14/nifi NIFI-3356

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/nifi/pull/1493.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #1493


          commit babbe4e42a6cc2b0d124691bb398f1ec33d3b8c8
          Author: Mark Payne <markap14@hotmail.com>
          Date: 2016-12-09T15:52:33Z

          NIFI-3356: Initial implementation of writeahead provenance repository

          • The idea behind NIFI-3356 was to improve the efficiency and throughput of the Provenance Repository, as it is often the bottleneck. While testing the newly designed repository,
            a handful of other, fairly minor, changes were made to improve efficiency as well, as these came to light when testing the new repository:
          • Use a BufferedOutputStream within StandardProcessSession (via a ClaimCache abstraction) in order to avoid continually writing to FileOutputStream when writing many small FlowFiles
          • Updated threading model of MinimalLockingWriteAheadLog - now performs serialization outside of lock and writes to a 'synchronized' OutputStream
          • Change minimum scheduling period for components from 30 microseconds to 1 nanosecond. ScheduledExecutor is very inconsistent with timing of task scheduling. With the bored.yield.duration
            now present, this value doesn't need to be set to 30 microseconds. This was originally done to avoid processors that had no work from dominating the CPU. However, now that we will yield
            when processors have no work, this results in slowing down processors that are able to perform work.
          • Allow nifi.properties to specify multiple directories for FlowFile Repository
          • If backpressure is engaged while running a batch of sessions, then stop batch processing earlier. This helps FlowFiles to move through the system much more smoothly instead of the
            herky-jerky queuing that we previously saw at very high rates of FlowFiles.
          • Added NiFi PID to log message when starting nifi. This was simply an update to the log message that provides helpful information.

          Show
          githubbot ASF GitHub Bot added a comment - GitHub user markap14 opened a pull request: https://github.com/apache/nifi/pull/1493 NIFI-3356 : Initial implementation of writeahead provenance repository The idea behind NIFI-3356 was to improve the efficiency and throughput of the Provenance Repository, as it is often the bottleneck. While testing the newly designed repository, a handful of other, fairly minor, changes were made to improve efficiency as well, as these came to light when testing the new repository: Use a BufferedOutputStream within StandardProcessSession (via a ClaimCache abstraction) in order to avoid continually writing to FileOutputStream when writing many small FlowFiles Updated threading model of MinimalLockingWriteAheadLog - now performs serialization outside of lock and writes to a 'synchronized' OutputStream Change minimum scheduling period for components from 30 microseconds to 1 nanosecond. ScheduledExecutor is very inconsistent with timing of task scheduling. With the bored.yield.duration now present, this value doesn't need to be set to 30 microseconds. This was originally done to avoid processors that had no work from dominating the CPU. However, now that we will yield when processors have no work, this results in slowing down processors that are able to perform work. Allow nifi.properties to specify multiple directories for FlowFile Repository If backpressure is engaged while running a batch of sessions, then stop batch processing earlier. This helps FlowFiles to move through the system much more smoothly instead of the herky-jerky queuing that we previously saw at very high rates of FlowFiles. Added NiFi PID to log message when starting nifi. This was simply an update to the log message that provides helpful information. Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: For all changes: [ ] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? [ ] Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. [ ] Has your PR been rebased against the latest commit within the target branch (typically master)? [ ] Is your initial contribution a single, squashed commit? For code changes: [ ] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? [ ] Have you written or updated unit tests to verify your changes? [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0] ( http://www.apache.org/legal/resolved.html#category-a)? [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? For documentation related changes: [ ] Have you ensured that format looks appropriate for the output in which it is rendered? Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/markap14/nifi NIFI-3356 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/1493.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1493 commit babbe4e42a6cc2b0d124691bb398f1ec33d3b8c8 Author: Mark Payne <markap14@hotmail.com> Date: 2016-12-09T15:52:33Z NIFI-3356 : Initial implementation of writeahead provenance repository The idea behind NIFI-3356 was to improve the efficiency and throughput of the Provenance Repository, as it is often the bottleneck. While testing the newly designed repository, a handful of other, fairly minor, changes were made to improve efficiency as well, as these came to light when testing the new repository: Use a BufferedOutputStream within StandardProcessSession (via a ClaimCache abstraction) in order to avoid continually writing to FileOutputStream when writing many small FlowFiles Updated threading model of MinimalLockingWriteAheadLog - now performs serialization outside of lock and writes to a 'synchronized' OutputStream Change minimum scheduling period for components from 30 microseconds to 1 nanosecond. ScheduledExecutor is very inconsistent with timing of task scheduling. With the bored.yield.duration now present, this value doesn't need to be set to 30 microseconds. This was originally done to avoid processors that had no work from dominating the CPU. However, now that we will yield when processors have no work, this results in slowing down processors that are able to perform work. Allow nifi.properties to specify multiple directories for FlowFile Repository If backpressure is engaged while running a batch of sessions, then stop batch processing earlier. This helps FlowFiles to move through the system much more smoothly instead of the herky-jerky queuing that we previously saw at very high rates of FlowFiles. Added NiFi PID to log message when starting nifi. This was simply an update to the log message that provides helpful information.

            People

            • Assignee:
              markap14 Mark Payne
              Reporter:
              markap14 Mark Payne
            • Votes:
              0 Vote for this issue
              Watchers:
              8 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development