Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-21866

SPIP: Image support in Spark



    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.2.0
    • 2.3.0
    • ML


      Background and motivation

      As Apache Spark is being used more and more in the industry, some new use cases are emerging for different data formats beyond the traditional SQL types or the numerical types (vectors and matrices). Deep Learning applications commonly deal with image processing. A number of projects add some Deep Learning capabilities to Spark (see list below), but they struggle to communicate with each other or with MLlib pipelines because there is no standard way to represent an image in Spark DataFrames. We propose to federate efforts for representing images in Spark by defining a representation that caters to the most common needs of users and library developers.

      This SPIP proposes a specification to represent images in Spark DataFrames and Datasets (based on existing industrial standards), and an interface for loading sources of images. It is not meant to be a full-fledged image processing library, but rather the core description that other libraries and users can rely on. Several packages already offer various processing facilities for transforming images or doing more complex operations, and each has various design tradeoffs that make them better as standalone solutions.

      This project is a joint collaboration between Microsoft and Databricks, which have been testing this design in two open source packages: MMLSpark and Deep Learning Pipelines.

      The proposed image format is an in-memory, decompressed representation that targets low-level applications. It is significantly more liberal in memory usage than compressed image representations such as JPEG, PNG, etc., but it allows easy communication with popular image processing libraries and has no decoding overhead.

      Targets users and personas:

      Data scientists, data engineers, library developers.
      The following libraries define primitives for loading and representing images, and will gain from a common interchange format (in alphabetical order):

      • BigDL
      • DeepLearning4J
      • Deep Learning Pipelines
      • MMLSpark
      • TensorFlow (Spark connector)
      • TensorFlowOnSpark
      • TensorFrames
      • Thunder


      • Simple representation of images in Spark DataFrames, based on pre-existing industrial standards (OpenCV)
      • This format should eventually allow the development of high-performance integration points with image processing libraries such as libOpenCV, Google TensorFlow, CNTK, and other C libraries.
      • The reader should be able to read popular formats of images from distributed sources.


      Images are a versatile medium and encompass a very wide range of formats and representations. This SPIP explicitly aims at the most common use case in the industry currently: multi-channel matrices of binary, int32, int64, float or double data that can fit comfortably in the heap of the JVM:

      • the total size of an image should be restricted to less than 2GB (roughly)
      • the meaning of color channels is application-specific and is not mandated by the standard (in line with the OpenCV standard)
      • specialized formats used in meteorology, the medical field, etc. are not supported
      • this format is specialized to images and does not attempt to solve the more general problem of representing n-dimensional tensors in Spark

      Proposed API changes

      We propose to add a new package in the package structure, under the MLlib project:

      Data format

      We propose to add the following structure:

      imageSchema = StructType([

      • StructField("mode", StringType(), False),
        • The exact representation of the data.
        • The values are described in the following OpenCV convention. Basically, the type has both "depth" and "number of channels" info: in particular, type "CV_8UC3" means "3 channel unsigned bytes". BGRA format would be CV_8UC4 (value 32 in the table) with the channel order specified by convention.
        • The exact channel ordering and meaning of each channel is dictated by convention. By default, the order is RGB (3 channels) and BGRA (4 channels).
          If the image failed to load, the value is the empty string "".
      • StructField("origin", StringType(), True),
        • Some information about the origin of the image. The content of this is application-specific.
        • When the image is loaded from files, users should expect to find the file name in this field.
      • StructField("height", IntegerType(), False),
        • the height of the image, pixels
        • If the image fails to load, the value is -1.
      • StructField("width", IntegerType(), False),
        • the width of the image, pixels
        • If the image fails to load, the value is -1.
      • StructField("nChannels", IntegerType(), False),
        • The number of channels in this image: it is typically a value of 1 (B&W), 3 (RGB), or 4 (BGRA)
        • If the image fails to load, the value is -1.
      • StructField("data", BinaryType(), False)
        • packed array content. Due to implementation limitation, it cannot currently store more than 2 billions of pixels.
        • The data is stored in a pixel-by-pixel BGR row-wise order. This follows the OpenCV convention.
        • If the image fails to load, this array is empty.

      For more information about image types, here is an OpenCV guide on types: http://docs.opencv.org/2.4/modules/core/doc/intro.html#fixed-pixel-types-limited-use-of-templates

      The reference implementation provides some functions to convert popular formats (JPEG, PNG, etc.) to the image specification above, and some functions to verify if an image is valid.

      Image ingest API

      We propose the following function to load images from a remote distributed source as a DataFrame. Here is the signature in Scala. The python interface is similar. For compatibility with java, this function should be made available through a builder pattern or through the DataSource API. The exact mechanics can be discussed during implementation; the goal of the proposal below is to propose a specification of the behavior.

      def readImages(
          path: String,
          session: SparkSession = null,
          recursive: Boolean = false,
          numPartitions: Int = 0,
          dropImageFailures: Boolean = false,
          // Experimental options
          sampleRatio: Double = 1.0): DataFrame

      The type of the returned DataFrame should be the structure type above, with the expectation that all the file names be filled.

      Mandatory parameters:

      • path: a directory for a file system that contains images
        Optional parameters:
      • session (SparkSession, default null): the Spark Session to use to create the dataframe. If not provided, it will use the current default Spark session via SparkSession.getOrCreate().
      • recursive (bool, default false): take the top-level images or look into directory recursively
      • numPartitions (int, default null): the number of partitions of the final dataframe. By default uses the default number of partitions from Spark.
      • dropImageFailures (bool, default false): drops the files that failed to load. If false (do not drop), some invalid images are kept.

      Parameters that are experimental/may be quickly deprecated. These would be useful to have but are not critical for a first cut:

      • sampleRatio (float, in (0,1), default 1): if less than 1, returns a fraction of the data. There is no statistical guarantee about how the sampling is performed. This proved to be very helpful for fast prototyping. Marked as experimental since it should be pushed to the Spark core.

      The implementation is expected to be in Scala for performance, with a wrapper for python.
      This function should be lazy to the extent possible: it should not trigger access to the data when called. Ideally, any file system supported by Spark should be supported when loading images. There may be restrictions for some options such as zip files, etc.

      The reference implementation has also some experimental options (undocumented here).

      Reference implementation

      A reference implementation is available as an open-source Spark package in this repository (Apache 2.0 license):

      This Spark package will also be published in a binary form on spark-packages.org .

      Comments about the API should be addressed in this ticket.

      Optional Rejected Designs

      The use of User-Defined Types was considered. It adds some burden to the implementation of various languages and does not provide significant advantages.


        Issue Links



              imatiach Ilya Matiach
              timhunter Timothy Hunter
              Joseph K. Bradley Joseph K. Bradley
              2 Vote for this issue
              35 Start watching this issue