Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-28781

Unneccesary persist in PeriodicCheckpointer.update()

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Not A Problem
    • 3.0.0
    • None
    • Spark Core
    • None

    Description

      Once the fuction update() is called, the RDD newData is persisted at line 82. However, only when meeting the checking point condition (at line 94), the persisted rdd newData would be used for the second time in the API checkpoint() (do checkpoint at line 97). In other conditions, newData will only be used once and it is unnecessary to persist the rdd in that case. Although the persistedQueue will be checked to avoid too many unnecessary cached data, it would be better to avoid every unnecessary persist operation.

      def update(newData: T): Unit = {
          persist(newData)
          persistedQueue.enqueue(newData)
          // We try to maintain 2 Datasets in persistedQueue to support the semantics of this class:
          // Users should call [[update()]] when a new Dataset has been created,
          // before the Dataset has been materialized.
          while (persistedQueue.size > 3) {
            val dataToUnpersist = persistedQueue.dequeue()
            unpersist(dataToUnpersist)
          }
          updateCount += 1
      
          // Handle checkpointing (after persisting)
          if (checkpointInterval != -1 && (updateCount % checkpointInterval) == 0
            && sc.getCheckpointDir.nonEmpty) {
            // Add new checkpoint before removing old checkpoints.
            checkpoint(newData)
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            spark_cachecheck IcySanwitch
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: