Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Not A Bug
-
None
-
None
-
None
Description
BucketingSink#snapshotState : (see the comment in this method)
public void snapshotState(FunctionSnapshotContext context) throws Exception { Preconditions.checkNotNull(restoredBucketStates, "The operator has not been properly initialized."); restoredBucketStates.clear(); synchronized (state.bucketStates) { int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask(); for (Map.Entry<String, BucketState<T>> bucketStateEntry : state.bucketStates.entrySet()) { BucketState<T> bucketState = bucketStateEntry.getValue(); if (bucketState.isWriterOpen) { bucketState.currentFileValidLength = bucketState.writer.flush(); } synchronized (bucketState.pendingFilesPerCheckpoint) { bucketState.pendingFilesPerCheckpoint.put(context.getCheckpointId(), bucketState.pendingFiles); } //This operation will make this collection prematurely emptied bucketState.pendingFiles = new ArrayList<>(); } restoredBucketStates.add(state); if (LOG.isDebugEnabled()) { LOG.debug("{} idx {} checkpointed {}.", getClass().getSimpleName(), subtaskIdx, state); } } }