Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-25858

Passing Field Metadata to Parquet

    XMLWordPrintableJSON

Details

    • New Feature
    • Status: Resolved
    • Major
    • Resolution: Later
    • 2.3.2
    • None
    • Input/Output
    • None

    Description

      Problem Statement

      The Spark WriteSupport class for Parquet is hardcoded to use org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport, which is not configurable. Currently, this class doesn’t carry over the field metadata in StructType to MessageType. However, Parquet column encryption (Parquet-1396, Parquet-1178) requires the field metadata inside MessageType of Parquet, so that the metadata can be used to control column encryption.

      Technical Solution

      1. Extend SparkToParquetSchemaConverter class and override convert() method to add the functionality of carrying over the field metadata
      2. Extend ParquetWriteSupport and use the extended converter in #1. The extension avoids changing the built-in WriteSupport to mitigate the risk.
      3. Change Spark code to make the WriteSupport class configurable to let the user configure to use the extended WriteSupport in #2.  The default WriteSupport is still org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.

      Technical Details

      Note: The code below kind of in messy format. The link below shows correct format. 

      Extend SparkToParquetSchemaConverter class

       SparkToParquetMetadataSchemaConverter extends SparkToParquetSchemaConverter {

       

            override def convert(catalystSchema: StructType): MessageType =

      {                   Types                   ._buildMessage_()                  .addFields(catalystSchema.map(*convertFieldWithMetadata*): _*)                 .named(ParquetSchemaConverter._SPARK_PARQUET_SCHEMA_NAME_)                       }

       

            private def convertFieldWithMetadata(field: StructField) : Type =

      {               val extField  = new ExtType[Any](convertField(field))               val metaBuilder = new MetadataBuilder().withMetadata(field.metadata)               val metaData = metaBuilder.getMap              extField.setMetadata(metaData)              return extField             }

       }

      Extend ParquetWriteSupport

      class CryptoParquetWriteSupport extends ParquetWriteSupport {

        override def init(configuration: Configuration): WriteContext =

      {               val converter = new *SparkToParquetMetadataSchemaConverter*(configuration)       createContext(configuration, converter)    }

      }

      Make WriteSupport configurable

      class ParquetFileFormat{

       

         **    override def prepareWrite(...) {

             …

             if (conf.get(ParquetOutputFormat.WRITE_SUPPORT_CLASS) == null) {

                 ParquetOutputFormat.setWriteSupportClass(job, classOf[ParquetWriteSupport])

             ** 

            ...

         }

      }

      Verification

      The ParquetHelloWorld.java in the github repository parquet-writesupport-extensions has a sample verification of passing down the field metadata and perform column encryption.

      Dependency

      • Parquet-1178
      • Parquet-1396
      • Parquet-1397

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              shangx@uber.com Xinli Shang
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: