Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-10840

BucketingSink incorrectly clears the pendingFiles List

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Not A Bug
    • None
    • None
    • Connectors / Common
    • None

    Description

      BucketingSink#snapshotState : (see the comment in this method)

      public void snapshotState(FunctionSnapshotContext context) throws Exception {
         Preconditions.checkNotNull(restoredBucketStates, "The operator has not been properly initialized.");
      
         restoredBucketStates.clear();
      
         synchronized (state.bucketStates) {
            int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
      
            for (Map.Entry<String, BucketState<T>> bucketStateEntry : state.bucketStates.entrySet()) {
               BucketState<T> bucketState = bucketStateEntry.getValue();
      
               if (bucketState.isWriterOpen) {
                  bucketState.currentFileValidLength = bucketState.writer.flush();
               }
      
               synchronized (bucketState.pendingFilesPerCheckpoint) {
                  bucketState.pendingFilesPerCheckpoint.put(context.getCheckpointId(), bucketState.pendingFiles);
               }
               //This operation will make this collection prematurely emptied
               bucketState.pendingFiles = new ArrayList<>();
            }
            restoredBucketStates.add(state);
      
            if (LOG.isDebugEnabled()) {
               LOG.debug("{} idx {} checkpointed {}.", getClass().getSimpleName(), subtaskIdx, state);
            }
         }
      }
      

       

       

      Attachments

        Activity

          People

            yanghua vinoyang
            yanghua vinoyang
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: