Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-4534

Lack of synchronization in BucketingSink#restoreState()

    XMLWordPrintableJSON

Details

    Description

      Iteration over state.bucketStates is protected by synchronization in other methods, except for the following in restoreState():

          for (BucketState<T> bucketState : state.bucketStates.values()) {
      

      and following in close():

          for (Map.Entry<String, BucketState<T>> entry : state.bucketStates.entrySet()) {
            closeCurrentPartFile(entry.getValue());
      

      w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue starting line 752:

            Set<Long> pastCheckpointIds = bucketState.pendingFilesPerCheckpoint.keySet();
            LOG.debug("Moving pending files to final location on restore.");
            for (Long pastCheckpointId : pastCheckpointIds) {
      

      Attachments

        Activity

          People

            mingleizhang zhangminglei
            yuzhihong@gmail.com Ted Yu
            Votes:
            0 Vote for this issue
            Watchers:
            7 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: