Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-3954

Optimization to FileInputDStream

    XMLWordPrintableJSON

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.0.0, 1.1.0
    • Fix Version/s: 1.1.1, 1.2.0
    • Component/s: DStreams
    • Labels:
      None

      Description

      about convert files to RDDS there are 3 loops with files sequence in spark source.
      loops files sequence:
      1.files.map(...)
      2.files.zip(fileRDDs)
      3.files-size.foreach
      modiy 3 recursions to 1 recursion.

      spark source code:
      private def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
      val fileRDDs = files.map(file => context.sparkContext.newAPIHadoopFile[K, V, F](file))
      files.zip(fileRDDs).foreach { case (file, rdd) => {
      if (rdd.partitions.size == 0)

      { logError("File " + file + " has no data in it. Spark Streaming can only ingest " + "files that have been \"moved\" to the directory assigned to the file stream. " + "Refer to the streaming programming guide for more details.") }
      }}
      new UnionRDD(context.sparkContext, fileRDDs)
      }
      // -----------------------------------------------------------------------------------
      modified code:
      private def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
      val fileRDDs = for (file <- files; rdd = context.sparkContext.newAPIHadoopFile[K, V, F](file))
      yield {
      if (rdd.partitions.size == 0) { logError("File " + file + " has no data in it. Spark Streaming can only ingest " + "files that have been "moved" to the directory assigned to the file stream. " + "Refer to the streaming programming guide for more details.") }

      rdd
      }
      new UnionRDD(context.sparkContext, fileRDDs)
      }

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              surq 宿荣全
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: